diff options
| -rw-r--r-- | client/src/leap/soledad/client/http_target/__init__.py | 5 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/http_target/api.py | 12 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/http_target/fetch.py | 1 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/http_target/send.py | 64 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/http_target/send_protocol.py | 61 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/http_target/support.py | 27 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/sync.py | 24 | ||||
| -rw-r--r-- | server/src/leap/soledad/server/__init__.py | 8 | ||||
| -rw-r--r-- | server/src/leap/soledad/server/sync.py | 5 | ||||
| -rw-r--r-- | testing/tests/perf/conftest.py | 2 | 
10 files changed, 135 insertions, 74 deletions
diff --git a/client/src/leap/soledad/client/http_target/__init__.py b/client/src/leap/soledad/client/http_target/__init__.py index 62e8bcf0..94de2feb 100644 --- a/client/src/leap/soledad/client/http_target/__init__.py +++ b/client/src/leap/soledad/client/http_target/__init__.py @@ -26,6 +26,8 @@ import os  from leap.soledad.common.log import getLogger  from leap.common.http import HTTPClient +from twisted.web.client import HTTPConnectionPool +from twisted.internet import reactor  from leap.soledad.client.http_target.send import HTTPDocSender  from leap.soledad.client.http_target.api import SyncTargetAPI  from leap.soledad.client.http_target.fetch import HTTPDocFetcher @@ -99,7 +101,8 @@ class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher):          # XXX Increasing timeout of simple requests to avoid chances of hitting          # the duplicated syncing bug. This could be reduced to the 30s default          # after implementing Cancellable Sync. See #7382 -        self._http = HTTPClient(cert_file, timeout=90) +        self._http = HTTPClient(cert_file, timeout=90, +                                pool=HTTPConnectionPool(reactor))          if DO_STATS:              self.sync_exchange_phase = [0] diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py index 4e068523..00b943e1 100644 --- a/client/src/leap/soledad/client/http_target/api.py +++ b/client/src/leap/soledad/client/http_target/api.py @@ -22,6 +22,7 @@ from uuid import uuid4  from twisted.web.error import Error  from twisted.internet import defer +from twisted.web.http_headers import Headers  from leap.soledad.client.http_target.support import readBody  from leap.soledad.common.errors import InvalidAuthTokenError @@ -72,11 +73,18 @@ class SyncTargetAPI(SyncTarget):          return self._sync_enc_pool is not None      def _http_request(self, url, method='GET', body=None, headers=None, -                      content_type=None, body_reader=readBody): +                      content_type=None, body_reader=readBody, +                      body_producer=None):          headers = headers or self._base_header          if content_type:              headers.update({'content-type': [content_type]}) -        d = self._http.request(url, method, body, headers, body_reader) +        if not body_producer: +            d = self._http.request(url, method, body, headers, body_reader) +        else: +            d = self._http._agent.request( +                method, url, headers=Headers(headers), +                bodyProducer=body_producer(body)) +            d.addCallback(body_reader)          d.addErrback(_unauth_to_invalid_token_error)          return d diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 063082e5..50e89a2a 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -23,7 +23,6 @@ from leap.soledad.client.http_target.support import RequestBody  from leap.soledad.common.log import getLogger  from leap.soledad.common.document import SoledadDocument  from leap.soledad.common.l2db import errors -from datetime import datetime  from . import fetch_protocol diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py index c7bd057e..fcda9bd7 100644 --- a/client/src/leap/soledad/client/http_target/send.py +++ b/client/src/leap/soledad/client/http_target/send.py @@ -22,6 +22,7 @@ from leap.soledad.common.log import getLogger  from leap.soledad.client.events import emit_async  from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS  from leap.soledad.client.http_target.support import RequestBody +from .send_protocol import DocStreamProducer  logger = getLogger(__name__) @@ -54,73 +55,56 @@ class HTTPDocSender(object):              last_known_trans_id=last_known_trans_id,              sync_id=sync_id,              ensure=self._ensure_callback is not None) -        total = len(docs_by_generation) -        while body.consumed < total: -            result = yield self._send_batch(total, body, docs_by_generation) +        result = yield self._send_batch(body, docs_by_generation)          response_dict = json.loads(result)[0]          gen_after_send = response_dict['new_generation']          trans_id_after_send = response_dict['new_transaction_id']          defer.returnValue([gen_after_send, trans_id_after_send]) -    def _delete_sent(self, docs): -        for doc, gen, trans_id in docs: -            self._sync_enc_pool.delete_encrypted_doc( -                doc.doc_id, doc.rev) -      @defer.inlineCallbacks -    def _send_batch(self, total, body, docs): -        sent = [] +    def _send_batch(self, body, docs): +        total = len(docs)          missing = total - body.consumed +        calls = []          for i in xrange(1, missing + 1): -            if body.pending_size > self.MAX_BATCH_SIZE: -                break              idx = body.consumed + i              entry = docs[idx - 1] -            sent.append(entry) -            yield self._prepare_one_doc(entry, body, idx, total) -        result = yield self._send_request(body.pop()) -        if self._defer_encryption: -            self._delete_sent(sent) +            calls.append((self._prepare_one_doc, +                         entry, body, idx, total)) +        result = yield self._send_request(body, calls)          _emit_send_status(self.uuid, body.consumed, total)          defer.returnValue(result) -    def _send_request(self, body): +    def _send_request(self, body, calls):          return self._http_request(              self._url,              method='POST', -            body=body, -            content_type='application/x-soledad-sync-put') +            body=(body, calls), +            content_type='application/x-soledad-sync-put', +            body_producer=DocStreamProducer)      @defer.inlineCallbacks      def _prepare_one_doc(self, entry, body, idx, total): -        doc, gen, trans_id = entry -        content = yield self._encrypt_doc(doc) +        get_doc, gen, trans_id = entry +        doc, content = yield self._encrypt_doc(get_doc)          body.insert_info(              id=doc.doc_id, rev=doc.rev, content=content, gen=gen,              trans_id=trans_id, number_of_docs=total,              doc_idx=idx) -    def _encrypt_doc(self, doc): -        d = None +    @defer.inlineCallbacks +    def _encrypt_doc(self, get_doc): +        if type(get_doc) == tuple: +            f, args = get_doc +            doc = yield f(args) +        else: +            # tests +            doc = get_doc          if doc.is_tombstone(): -            d = defer.succeed(None) -        elif not self._defer_encryption: -            # fallback case, for tests -            d = defer.succeed(self._crypto.encrypt_doc(doc)) +            defer.returnValue((doc, None))          else: - -            def _maybe_encrypt_doc_inline(doc_json): -                if doc_json is None: -                    # the document is not marked as tombstone, but we got -                    # nothing from the sync db. As it is not encrypted -                    # yet, we force inline encryption. -                    return self._crypto.encrypt_doc(doc) -                return doc_json - -            d = self._sync_enc_pool.get_encrypted_doc(doc.doc_id, doc.rev) -            d.addCallback(_maybe_encrypt_doc_inline) -        return d +            defer.returnValue((doc, self._crypto.encrypt_doc(doc)))  def _emit_send_status(user_data, idx, total): diff --git a/client/src/leap/soledad/client/http_target/send_protocol.py b/client/src/leap/soledad/client/http_target/send_protocol.py new file mode 100644 index 00000000..c72c6d13 --- /dev/null +++ b/client/src/leap/soledad/client/http_target/send_protocol.py @@ -0,0 +1,61 @@ +from zope.interface import implements +from twisted.internet import defer +from twisted.internet import reactor +from twisted.web.iweb import IBodyProducer +from twisted.web.iweb import UNKNOWN_LENGTH + + +class DocStreamProducer(object): +    """ +    A producer that writes the body of a request to a consumer. +    """ + +    implements(IBodyProducer) + +    def __init__(self, parser_producer): +        """ +        Initialize the string produer. + +        :param body: The body of the request. +        :type body: str +        """ +        self.body, self.producer = parser_producer +        self.length = UNKNOWN_LENGTH +        self.pause = False +        self.stop = False + +    @defer.inlineCallbacks +    def startProducing(self, consumer): +        """ +        Write the body to the consumer. + +        :param consumer: Any IConsumer provider. +        :type consumer: twisted.internet.interfaces.IConsumer + +        :return: A successful deferred. +        :rtype: twisted.internet.defer.Deferred +        """ +        call = self.producer.pop(0) +        yield call[0](*call[1:]) +        while self.producer and not self.stop: +            if self.pause: +                yield self.sleep(0.01) +                continue +            call = self.producer.pop(0) +            yield call[0](*call[1:]) +            consumer.write(self.body.pop(1)) +        consumer.write(self.body.pop(1)) + +    def sleep(self, secs): +        d = defer.Deferred() +        reactor.callLater(secs, d.callback, None) +        return d + +    def pauseProducing(self): +        self.pause = True + +    def stopProducing(self): +        self.stop = True + +    def resumeProducing(self): +        self.pause = False diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py index 6ec98ed4..40e5eb55 100644 --- a/client/src/leap/soledad/client/http_target/support.py +++ b/client/src/leap/soledad/client/http_target/support.py @@ -155,7 +155,6 @@ class RequestBody(object):          self.headers = header_dict          self.entries = []          self.consumed = 0 -        self.pending_size = 0      def insert_info(self, **entry_dict):          """ @@ -169,9 +168,8 @@ class RequestBody(object):          """          entry = json.dumps(entry_dict)          self.entries.append(entry) -        self.pending_size += len(entry) -    def pop(self): +    def pop(self, amount=10):          """          Removes all entries and returns it formatted and ready          to be sent. @@ -182,19 +180,20 @@ class RequestBody(object):          :return: formatted body ready to be sent          :rtype: str          """ -        entries = self.entries[:] -        self.entries = [] -        self.pending_size = 0 -        self.consumed += len(entries) -        return self.entries_to_str(entries) +        start = self.consumed == 0 +        amount = min([len(self.entries), amount]) +        entries = [self.entries.pop(0) for i in xrange(amount)] +        self.consumed += amount +        end = len(self.entries) == 0 +        return self.entries_to_str(entries, start, end)      def __str__(self): -        return self.entries_to_str(self.entries) +        return self.pop(len(self.entries))      def __len__(self):          return len(self.entries) -    def entries_to_str(self, entries=None): +    def entries_to_str(self, entries=None, start=True, end=True):          """          Format a list of entries into the body format expected          by the server. @@ -205,6 +204,10 @@ class RequestBody(object):          :return: formatted body ready to be sent          :rtype: str          """ -        data = '[\r\n' + json.dumps(self.headers) +        data = '' +        if start: +            data = '[\r\n' + json.dumps(self.headers)          data += ''.join(',\r\n' + entry for entry in entries) -        return data + '\r\n]' +        if end: +            data += '\r\n]' +        return data diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index d3cfe029..9d237d98 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -142,17 +142,21 @@ class SoledadSynchronizer(Synchronizer):          # --------------------------------------------------------------------          # prepare to send all the changed docs -        changed_doc_ids = [doc_id for doc_id, _, _ in changes] -        docs_to_send = self.source.get_docs( -            changed_doc_ids, check_for_conflicts=False, include_deleted=True) -        ids_sent = [] +        # changed_doc_ids = [doc_id for doc_id, _, _ in changes] +        # docs_to_send = self.source.get_docs( +        #     changed_doc_ids, check_for_conflicts=False, include_deleted=True) +        ids_sent = [doc_id for doc_id, _, _ in changes] +        # docs_by_generation = [] +        # idx = 0 +        # for doc in docs_to_send: +        #     _, gen, trans = changes[idx] +        #     docs_by_generation.append((doc, gen, trans)) +        #     idx += 1 +        #     ids_sent.append(doc.doc_id)          docs_by_generation = [] -        idx = 0 -        for doc in docs_to_send: -            _, gen, trans = changes[idx] -            docs_by_generation.append((doc, gen, trans)) -            idx += 1 -            ids_sent.append(doc.doc_id) +        for doc_id, gen, trans in changes: +            get_doc = (self.source.get_doc, doc_id) +            docs_by_generation.append((get_doc, gen, trans))          # exchange documents and try to insert the returned ones with          # the target, return target synced-up-to gen. diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py index d8243c19..889bf48f 100644 --- a/server/src/leap/soledad/server/__init__.py +++ b/server/src/leap/soledad/server/__init__.py @@ -193,7 +193,8 @@ class HTTPInvocationByMethodWithBody(              try:                  content_length = int(self.environ['CONTENT_LENGTH'])              except (ValueError, KeyError): -                raise http_app.BadRequest +                # raise http_app.BadRequest +                content_length = self.max_request_size              if content_length <= 0:                  raise http_app.BadRequest              if content_length > self.max_request_size: @@ -219,7 +220,6 @@ class HTTPInvocationByMethodWithBody(                  if content_type == 'application/x-soledad-sync-put':                      meth_put = self._lookup('%s_put' % method)                      meth_end = self._lookup('%s_end' % method) -                    entries = []                      while True:                          line = body_getline()                          entry = line.strip() @@ -228,11 +228,9 @@ class HTTPInvocationByMethodWithBody(                          if not entry or not comma:  # empty or no prec comma                              raise http_app.BadRequest                          entry, comma = utils.check_and_strip_comma(entry) -                        entries.append(entry) +                        meth_put({}, entry)                      if comma or body_getline():  # extra comma or data                          raise http_app.BadRequest -                    for entry in entries: -                        meth_put({}, entry)                      return meth_end()                  # handle outgoing documents                  elif content_type == 'application/x-soledad-sync-get': diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index c958bfaa..0bf7b236 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -237,7 +237,9 @@ class SyncResource(http_app.SyncResource):          :type doc_idx: int          """          doc = Document(id, rev, content) -        self._staging.append((doc, gen, trans_id, number_of_docs, doc_idx)) +        self.sync_exch.insert_doc_from_source( +            doc, gen, trans_id, number_of_docs=None, +            doc_idx=None, sync_id=None)      @http_app.http_method(received=int, content_as_args=True)      def post_get(self, received): @@ -282,7 +284,6 @@ class SyncResource(http_app.SyncResource):          Return the current generation and transaction_id after inserting one          incoming document.          """ -        self.sync_exch.batched_insert_from_source(self._staging, self._sync_id)          self.responder.content_type = 'application/x-soledad-sync-response'          self.responder.start_response(200)          self.responder.start_stream(), diff --git a/testing/tests/perf/conftest.py b/testing/tests/perf/conftest.py index 6fa6b2c0..09567b88 100644 --- a/testing/tests/perf/conftest.py +++ b/testing/tests/perf/conftest.py @@ -243,7 +243,7 @@ def soledad_client(tmpdir, soledad_server, remote_db, soledad_dbs, request):              server_url=server_url,              cert_file=None,              auth_token=token, -            defer_encryption=True) +            defer_encryption=False)          request.addfinalizer(soledad_client.close)          return soledad_client      return create  | 
