summaryrefslogtreecommitdiff
path: root/client/src
diff options
context:
space:
mode:
Diffstat (limited to 'client/src')
-rw-r--r--client/src/leap/soledad/client/http_target/__init__.py5
-rw-r--r--client/src/leap/soledad/client/http_target/api.py12
-rw-r--r--client/src/leap/soledad/client/http_target/fetch.py1
-rw-r--r--client/src/leap/soledad/client/http_target/send.py64
-rw-r--r--client/src/leap/soledad/client/http_target/send_protocol.py61
-rw-r--r--client/src/leap/soledad/client/http_target/support.py27
-rw-r--r--client/src/leap/soledad/client/sync.py24
7 files changed, 128 insertions, 66 deletions
diff --git a/client/src/leap/soledad/client/http_target/__init__.py b/client/src/leap/soledad/client/http_target/__init__.py
index 62e8bcf0..94de2feb 100644
--- a/client/src/leap/soledad/client/http_target/__init__.py
+++ b/client/src/leap/soledad/client/http_target/__init__.py
@@ -26,6 +26,8 @@ import os
from leap.soledad.common.log import getLogger
from leap.common.http import HTTPClient
+from twisted.web.client import HTTPConnectionPool
+from twisted.internet import reactor
from leap.soledad.client.http_target.send import HTTPDocSender
from leap.soledad.client.http_target.api import SyncTargetAPI
from leap.soledad.client.http_target.fetch import HTTPDocFetcher
@@ -99,7 +101,8 @@ class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher):
# XXX Increasing timeout of simple requests to avoid chances of hitting
# the duplicated syncing bug. This could be reduced to the 30s default
# after implementing Cancellable Sync. See #7382
- self._http = HTTPClient(cert_file, timeout=90)
+ self._http = HTTPClient(cert_file, timeout=90,
+ pool=HTTPConnectionPool(reactor))
if DO_STATS:
self.sync_exchange_phase = [0]
diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py
index 4e068523..00b943e1 100644
--- a/client/src/leap/soledad/client/http_target/api.py
+++ b/client/src/leap/soledad/client/http_target/api.py
@@ -22,6 +22,7 @@ from uuid import uuid4
from twisted.web.error import Error
from twisted.internet import defer
+from twisted.web.http_headers import Headers
from leap.soledad.client.http_target.support import readBody
from leap.soledad.common.errors import InvalidAuthTokenError
@@ -72,11 +73,18 @@ class SyncTargetAPI(SyncTarget):
return self._sync_enc_pool is not None
def _http_request(self, url, method='GET', body=None, headers=None,
- content_type=None, body_reader=readBody):
+ content_type=None, body_reader=readBody,
+ body_producer=None):
headers = headers or self._base_header
if content_type:
headers.update({'content-type': [content_type]})
- d = self._http.request(url, method, body, headers, body_reader)
+ if not body_producer:
+ d = self._http.request(url, method, body, headers, body_reader)
+ else:
+ d = self._http._agent.request(
+ method, url, headers=Headers(headers),
+ bodyProducer=body_producer(body))
+ d.addCallback(body_reader)
d.addErrback(_unauth_to_invalid_token_error)
return d
diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py
index 063082e5..50e89a2a 100644
--- a/client/src/leap/soledad/client/http_target/fetch.py
+++ b/client/src/leap/soledad/client/http_target/fetch.py
@@ -23,7 +23,6 @@ from leap.soledad.client.http_target.support import RequestBody
from leap.soledad.common.log import getLogger
from leap.soledad.common.document import SoledadDocument
from leap.soledad.common.l2db import errors
-from datetime import datetime
from . import fetch_protocol
diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py
index c7bd057e..fcda9bd7 100644
--- a/client/src/leap/soledad/client/http_target/send.py
+++ b/client/src/leap/soledad/client/http_target/send.py
@@ -22,6 +22,7 @@ from leap.soledad.common.log import getLogger
from leap.soledad.client.events import emit_async
from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS
from leap.soledad.client.http_target.support import RequestBody
+from .send_protocol import DocStreamProducer
logger = getLogger(__name__)
@@ -54,73 +55,56 @@ class HTTPDocSender(object):
last_known_trans_id=last_known_trans_id,
sync_id=sync_id,
ensure=self._ensure_callback is not None)
- total = len(docs_by_generation)
- while body.consumed < total:
- result = yield self._send_batch(total, body, docs_by_generation)
+ result = yield self._send_batch(body, docs_by_generation)
response_dict = json.loads(result)[0]
gen_after_send = response_dict['new_generation']
trans_id_after_send = response_dict['new_transaction_id']
defer.returnValue([gen_after_send, trans_id_after_send])
- def _delete_sent(self, docs):
- for doc, gen, trans_id in docs:
- self._sync_enc_pool.delete_encrypted_doc(
- doc.doc_id, doc.rev)
-
@defer.inlineCallbacks
- def _send_batch(self, total, body, docs):
- sent = []
+ def _send_batch(self, body, docs):
+ total = len(docs)
missing = total - body.consumed
+ calls = []
for i in xrange(1, missing + 1):
- if body.pending_size > self.MAX_BATCH_SIZE:
- break
idx = body.consumed + i
entry = docs[idx - 1]
- sent.append(entry)
- yield self._prepare_one_doc(entry, body, idx, total)
- result = yield self._send_request(body.pop())
- if self._defer_encryption:
- self._delete_sent(sent)
+ calls.append((self._prepare_one_doc,
+ entry, body, idx, total))
+ result = yield self._send_request(body, calls)
_emit_send_status(self.uuid, body.consumed, total)
defer.returnValue(result)
- def _send_request(self, body):
+ def _send_request(self, body, calls):
return self._http_request(
self._url,
method='POST',
- body=body,
- content_type='application/x-soledad-sync-put')
+ body=(body, calls),
+ content_type='application/x-soledad-sync-put',
+ body_producer=DocStreamProducer)
@defer.inlineCallbacks
def _prepare_one_doc(self, entry, body, idx, total):
- doc, gen, trans_id = entry
- content = yield self._encrypt_doc(doc)
+ get_doc, gen, trans_id = entry
+ doc, content = yield self._encrypt_doc(get_doc)
body.insert_info(
id=doc.doc_id, rev=doc.rev, content=content, gen=gen,
trans_id=trans_id, number_of_docs=total,
doc_idx=idx)
- def _encrypt_doc(self, doc):
- d = None
+ @defer.inlineCallbacks
+ def _encrypt_doc(self, get_doc):
+ if type(get_doc) == tuple:
+ f, args = get_doc
+ doc = yield f(args)
+ else:
+ # tests
+ doc = get_doc
if doc.is_tombstone():
- d = defer.succeed(None)
- elif not self._defer_encryption:
- # fallback case, for tests
- d = defer.succeed(self._crypto.encrypt_doc(doc))
+ defer.returnValue((doc, None))
else:
-
- def _maybe_encrypt_doc_inline(doc_json):
- if doc_json is None:
- # the document is not marked as tombstone, but we got
- # nothing from the sync db. As it is not encrypted
- # yet, we force inline encryption.
- return self._crypto.encrypt_doc(doc)
- return doc_json
-
- d = self._sync_enc_pool.get_encrypted_doc(doc.doc_id, doc.rev)
- d.addCallback(_maybe_encrypt_doc_inline)
- return d
+ defer.returnValue((doc, self._crypto.encrypt_doc(doc)))
def _emit_send_status(user_data, idx, total):
diff --git a/client/src/leap/soledad/client/http_target/send_protocol.py b/client/src/leap/soledad/client/http_target/send_protocol.py
new file mode 100644
index 00000000..c72c6d13
--- /dev/null
+++ b/client/src/leap/soledad/client/http_target/send_protocol.py
@@ -0,0 +1,61 @@
+from zope.interface import implements
+from twisted.internet import defer
+from twisted.internet import reactor
+from twisted.web.iweb import IBodyProducer
+from twisted.web.iweb import UNKNOWN_LENGTH
+
+
+class DocStreamProducer(object):
+ """
+ A producer that writes the body of a request to a consumer.
+ """
+
+ implements(IBodyProducer)
+
+ def __init__(self, parser_producer):
+ """
+ Initialize the string produer.
+
+ :param body: The body of the request.
+ :type body: str
+ """
+ self.body, self.producer = parser_producer
+ self.length = UNKNOWN_LENGTH
+ self.pause = False
+ self.stop = False
+
+ @defer.inlineCallbacks
+ def startProducing(self, consumer):
+ """
+ Write the body to the consumer.
+
+ :param consumer: Any IConsumer provider.
+ :type consumer: twisted.internet.interfaces.IConsumer
+
+ :return: A successful deferred.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ call = self.producer.pop(0)
+ yield call[0](*call[1:])
+ while self.producer and not self.stop:
+ if self.pause:
+ yield self.sleep(0.01)
+ continue
+ call = self.producer.pop(0)
+ yield call[0](*call[1:])
+ consumer.write(self.body.pop(1))
+ consumer.write(self.body.pop(1))
+
+ def sleep(self, secs):
+ d = defer.Deferred()
+ reactor.callLater(secs, d.callback, None)
+ return d
+
+ def pauseProducing(self):
+ self.pause = True
+
+ def stopProducing(self):
+ self.stop = True
+
+ def resumeProducing(self):
+ self.pause = False
diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py
index 6ec98ed4..40e5eb55 100644
--- a/client/src/leap/soledad/client/http_target/support.py
+++ b/client/src/leap/soledad/client/http_target/support.py
@@ -155,7 +155,6 @@ class RequestBody(object):
self.headers = header_dict
self.entries = []
self.consumed = 0
- self.pending_size = 0
def insert_info(self, **entry_dict):
"""
@@ -169,9 +168,8 @@ class RequestBody(object):
"""
entry = json.dumps(entry_dict)
self.entries.append(entry)
- self.pending_size += len(entry)
- def pop(self):
+ def pop(self, amount=10):
"""
Removes all entries and returns it formatted and ready
to be sent.
@@ -182,19 +180,20 @@ class RequestBody(object):
:return: formatted body ready to be sent
:rtype: str
"""
- entries = self.entries[:]
- self.entries = []
- self.pending_size = 0
- self.consumed += len(entries)
- return self.entries_to_str(entries)
+ start = self.consumed == 0
+ amount = min([len(self.entries), amount])
+ entries = [self.entries.pop(0) for i in xrange(amount)]
+ self.consumed += amount
+ end = len(self.entries) == 0
+ return self.entries_to_str(entries, start, end)
def __str__(self):
- return self.entries_to_str(self.entries)
+ return self.pop(len(self.entries))
def __len__(self):
return len(self.entries)
- def entries_to_str(self, entries=None):
+ def entries_to_str(self, entries=None, start=True, end=True):
"""
Format a list of entries into the body format expected
by the server.
@@ -205,6 +204,10 @@ class RequestBody(object):
:return: formatted body ready to be sent
:rtype: str
"""
- data = '[\r\n' + json.dumps(self.headers)
+ data = ''
+ if start:
+ data = '[\r\n' + json.dumps(self.headers)
data += ''.join(',\r\n' + entry for entry in entries)
- return data + '\r\n]'
+ if end:
+ data += '\r\n]'
+ return data
diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py
index d3cfe029..9d237d98 100644
--- a/client/src/leap/soledad/client/sync.py
+++ b/client/src/leap/soledad/client/sync.py
@@ -142,17 +142,21 @@ class SoledadSynchronizer(Synchronizer):
# --------------------------------------------------------------------
# prepare to send all the changed docs
- changed_doc_ids = [doc_id for doc_id, _, _ in changes]
- docs_to_send = self.source.get_docs(
- changed_doc_ids, check_for_conflicts=False, include_deleted=True)
- ids_sent = []
+ # changed_doc_ids = [doc_id for doc_id, _, _ in changes]
+ # docs_to_send = self.source.get_docs(
+ # changed_doc_ids, check_for_conflicts=False, include_deleted=True)
+ ids_sent = [doc_id for doc_id, _, _ in changes]
+ # docs_by_generation = []
+ # idx = 0
+ # for doc in docs_to_send:
+ # _, gen, trans = changes[idx]
+ # docs_by_generation.append((doc, gen, trans))
+ # idx += 1
+ # ids_sent.append(doc.doc_id)
docs_by_generation = []
- idx = 0
- for doc in docs_to_send:
- _, gen, trans = changes[idx]
- docs_by_generation.append((doc, gen, trans))
- idx += 1
- ids_sent.append(doc.doc_id)
+ for doc_id, gen, trans in changes:
+ get_doc = (self.source.get_doc, doc_id)
+ docs_by_generation.append((get_doc, gen, trans))
# exchange documents and try to insert the returned ones with
# the target, return target synced-up-to gen.