diff options
| author | Kali Kaneko <kali@leap.se> | 2015-12-04 15:02:42 -0400 | 
|---|---|---|
| committer | Kali Kaneko <kali@leap.se> | 2015-12-04 15:02:42 -0400 | 
| commit | fb7045bbb15af70def876775eef9eb1a2f5c6ca7 (patch) | |
| tree | 1cb0547cd42a0347e8c849e4d1e0343cf8626f9a /client/src | |
| parent | f8982e74768fd7039b543a97060701dec444a9f5 (diff) | |
| parent | 27bda0ac201e236e3a2c9671462a337f2970e993 (diff) | |
Merge branch 'develop' into debian/platform-0.8
Diffstat (limited to 'client/src')
| -rw-r--r-- | client/src/leap/soledad/client/encdecpool.py | 20 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/http_target/send.py | 46 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/http_target/support.py | 13 | 
3 files changed, 49 insertions, 30 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 6d3c11b9..0954c1df 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -70,11 +70,13 @@ class SyncEncryptDecryptPool(object):          self._started = False      def start(self): +        if self.running: +            return          self._create_pool()          self._started = True      def stop(self): -        if not self._started: +        if not self.running:              return          self._started = False          self._destroy_pool() @@ -650,14 +652,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          last_idx = self._last_inserted_idx          for doc_id, rev, content, gen, trans_id, encrypted, idx in \                  decrypted_docs: -            # XXX for some reason, a document might not have been deleted from -            #     the database. This is a bug. In this point, already -            #     processed documents should have been removed from the sync -            #     database and we should not have to skip them here. We need -            #     to find out why this is happening, fix, and remove the -            #     skipping below. -            if (idx < last_idx + 1): -                continue              if (idx != last_idx + 1):                  break              insertable.append((doc_id, rev, content, gen, trans_id, idx)) @@ -763,6 +757,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,)          return self._runOperation(query) +    @defer.inlineCallbacks      def _collect_async_decryption_results(self):          """          Collect the results of the asynchronous doc decryptions and re-raise @@ -773,7 +768,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          async_results = self._async_results[:]          for res in async_results:              if res.ready(): -                self._decrypt_doc_cb(res.get())  # might raise an exception! +                # XXX: might raise an exception! +                yield self._decrypt_doc_cb(res.get())                  self._async_results.remove(res)      @defer.inlineCallbacks @@ -796,7 +792,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          if processed < pending:              yield self._async_decrypt_received_docs() -            self._collect_async_decryption_results() +            yield self._collect_async_decryption_results()              docs = yield self._process_decrypted_docs()              yield self._delete_processed_docs(docs)              # recurse @@ -807,6 +803,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):              self._finish()      def _finish(self): -        self._deferred.callback(None)          self._processed_docs = 0          self._last_inserted_idx = 0 +        self._deferred.callback(None) diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py index 80483f0d..e8abf35b 100644 --- a/client/src/leap/soledad/client/http_target/send.py +++ b/client/src/leap/soledad/client/http_target/send.py @@ -29,6 +29,8 @@ class HTTPDocSender(object):      They need to be encrypted and metadata prepared before sending.      """ +    MAX_BATCH_SIZE = 0  # disabled by now, this is being tested yet +      @defer.inlineCallbacks      def _send_docs(self, docs_by_generation, last_known_generation,                     last_known_trans_id, sync_id): @@ -43,25 +45,41 @@ class HTTPDocSender(object):              sync_id=sync_id,              ensure=self._ensure_callback is not None)          total = len(docs_by_generation) -        for idx, entry in enumerate(docs_by_generation, 1): -            yield self._prepare_one_doc(entry, body, idx, total) -            result = yield self._http_request( -                self._url, -                method='POST', -                body=body.pop(1), -                content_type='application/x-soledad-sync-put') -            if self._defer_encryption: -                self._delete_sent(idx, docs_by_generation) -            _emit_send_status(idx, total) +        while body.consumed < total: +            result = yield self._send_batch(total, body, docs_by_generation)          response_dict = json.loads(result)[0]          gen_after_send = response_dict['new_generation']          trans_id_after_send = response_dict['new_transaction_id']          defer.returnValue([gen_after_send, trans_id_after_send]) -    def _delete_sent(self, idx, docs_by_generation): -        doc = docs_by_generation[idx - 1][0] -        self._sync_enc_pool.delete_encrypted_doc( -            doc.doc_id, doc.rev) +    def _delete_sent(self, docs): +        for doc, gen, trans_id in docs: +            self._sync_enc_pool.delete_encrypted_doc( +                doc.doc_id, doc.rev) + +    @defer.inlineCallbacks +    def _send_batch(self, total, body, docs): +        sent = [] +        missing = total - body.consumed +        for i in xrange(1, missing + 1): +            if body.pending_size > self.MAX_BATCH_SIZE: +                break +            idx = body.consumed + i +            entry = docs[idx - 1] +            sent.append(entry) +            yield self._prepare_one_doc(entry, body, idx, total) +        result = yield self._send_request(body.pop()) +        if self._defer_encryption: +            self._delete_sent(sent) +        _emit_send_status(body.consumed, total) +        defer.returnValue(result) + +    def _send_request(self, body): +        return self._http_request( +            self._url, +            method='POST', +            body=body, +            content_type='application/x-soledad-sync-put')      @defer.inlineCallbacks      def _prepare_one_doc(self, entry, body, idx, total): diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py index 44cd7089..2625744c 100644 --- a/client/src/leap/soledad/client/http_target/support.py +++ b/client/src/leap/soledad/client/http_target/support.py @@ -152,6 +152,8 @@ class RequestBody(object):          """          self.headers = header_dict          self.entries = [] +        self.consumed = 0 +        self.pending_size = 0      def insert_info(self, **entry_dict):          """ @@ -165,11 +167,11 @@ class RequestBody(object):          """          entry = json.dumps(entry_dict)          self.entries.append(entry) -        return len(entry) +        self.pending_size += len(entry) -    def pop(self, number=1): +    def pop(self):          """ -        Removes an amount of entries and returns it formatted and ready +        Removes all entries and returns it formatted and ready          to be sent.          :param number: number of entries to pop and format @@ -178,7 +180,10 @@ class RequestBody(object):          :return: formatted body ready to be sent          :rtype: str          """ -        entries = [self.entries.pop(0) for i in xrange(number)] +        entries = self.entries[:] +        self.entries = [] +        self.pending_size = 0 +        self.consumed += len(entries)          return self.entries_to_str(entries)      def __str__(self):  | 
