1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
# -*- coding: utf-8 -*-
# test_incoming_server.py
# Copyright (C) 2017 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Integration tests for incoming API
"""
import pytest
import json
from io import BytesIO
from uuid import uuid4
from twisted.web.test.test_web import DummyRequest
from twisted.web.server import Site
from twisted.internet import reactor
from twisted.internet import defer
import treq
from leap.soledad.server._incoming import IncomingResource
from leap.soledad.server._blobs import BlobsServerState
from leap.soledad.server._incoming import IncomingFormatter
from leap.soledad.common.crypto import EncryptionSchemes
from leap.soledad.common.blobs import Flags
from test_soledad.util import CouchServerStateForTests
from test_soledad.util import CouchDBTestCase
class IncomingOnCouchServerTestCase(CouchDBTestCase):
def setUp(self):
self.port = None
def tearDown(self):
if self.port:
self.port.stopListening()
def prepare(self, backend):
self.user_id = 'user-' + uuid4().hex
if backend == 'couch':
self.state = CouchServerStateForTests(self.couch_url)
self.state.ensure_database(self.user_id)
else:
self.state = BlobsServerState(backend)
root = IncomingResource(self.state)
site = Site(root)
self.port = reactor.listenTCP(0, site, interface='127.0.0.1')
self.host = self.port.getHost()
self.uri = 'http://%s:%s/' % (self.host.host, self.host.port)
@defer.inlineCallbacks
@pytest.mark.usefixtures("method_tmpdir")
def test_put_incoming_creates_a_document_using_couch(self):
self.prepare('couch')
user_id, doc_id = self.user_id, uuid4().hex
content, scheme = 'Hi', EncryptionSchemes.PUBKEY
formatter = IncomingFormatter()
incoming_endpoint = self.uri + '%s/%s' % (user_id, doc_id)
yield treq.put(incoming_endpoint, BytesIO(content), persistent=False)
db = self.state.open_database(user_id)
doc = db.get_doc(doc_id)
self.assertEquals(doc.content, formatter.format(content, scheme))
@defer.inlineCallbacks
@pytest.mark.usefixtures("method_tmpdir")
def test_put_incoming_creates_a_blob_using_filesystem(self):
self.prepare('filesystem')
user_id, doc_id = self.user_id, uuid4().hex
content = 'Hi'
formatter = IncomingFormatter()
incoming_endpoint = self.uri + '%s/%s' % (user_id, doc_id)
yield treq.put(incoming_endpoint, BytesIO(content), persistent=False)
db = self.state.open_database(user_id)
request = DummyRequest([user_id, doc_id])
yield db.read_blob(user_id, doc_id, request, 'MX')
flags = db.get_flags(user_id, doc_id, request, 'MX')
flags = json.loads(flags)
expected = formatter.preamble(content, doc_id) + ' ' + content
self.assertEquals(expected, request.written[0])
self.assertIn(Flags.PENDING, flags)
|