blob: 7055a765a1878c0e7d5596455ceb7ec147a5b72e (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
# -*- coding: utf-8 -*-
import json
from twisted.internet.defer import inlineCallbacks
from leap.soledad.client.encdecpool import SyncEncrypterPool
from leap.soledad.common.document import SoledadDocument
from test_soledad.util import BaseSoledadTest
DOC_ID = "mydoc"
DOC_REV = "rev"
DOC_CONTENT = {'simple': 'document'}
class TestSyncEncrypterPool(BaseSoledadTest):
def setUp(self):
BaseSoledadTest.setUp(self)
crypto = self._soledad._crypto
sync_db = self._soledad._sync_db
self._pool = SyncEncrypterPool(crypto, sync_db)
self._pool.start()
def tearDown(self):
self._pool.stop()
BaseSoledadTest.tearDown(self)
@inlineCallbacks
def test_get_encrypted_doc_returns_none(self):
"""
Test that trying to get an encrypted doc from the pool returns None if
the document was never added for encryption.
"""
doc = yield self._pool.get_encrypted_doc(DOC_ID, DOC_REV)
self.assertIsNone(doc)
@inlineCallbacks
def test_encrypt_doc_and_get_it_back(self):
"""
Test that the pool actually encrypts a document added to the queue.
"""
doc = SoledadDocument(
doc_id=DOC_ID, rev=DOC_REV, json=json.dumps(DOC_CONTENT))
yield self._pool.encrypt_doc(doc)
encrypted = yield self._pool.get_encrypted_doc(DOC_ID, DOC_REV)
self.assertIsNotNone(encrypted)
|