1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
# Copyright 2011 Canonical Ltd.
#
# This file is part of u1db.
#
# u1db is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License version 3
# as published by the Free Software Foundation.
#
# u1db is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with u1db. If not, see <http://www.gnu.org/licenses/>.
"""Test sqlcipher backend internals."""
import os
import time
import threading
import unittest2 as unittest
from sqlite3 import dbapi2
from u1db import (
errors,
query_parser,
)
from leap.soledad.backends import sqlcipher as sqlite_backend
from leap.soledad.backends.leap_backend import LeapDocument
from leap.soledad.tests import u1db_tests
from leap.soledad.tests.u1db_tests.test_sqlite_backend import (
TestSQLiteDatabase,
TestSQLitePartialExpandDatabase,
)
from leap.soledad.tests.u1db_tests.test_backends import TestAlternativeDocument
PASSWORD = '123456'
class TestSQLCipherDatabase(TestSQLitePartialExpandDatabase):
def setUp(self):
super(TestSQLitePartialExpandDatabase, self).setUp()
self.db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD)
self.db._set_replica_uid('test')
def test_default_replica_uid(self):
self.db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD)
self.assertIsNot(None, self.db._replica_uid)
self.assertEqual(32, len(self.db._replica_uid))
int(self.db._replica_uid, 16)
def test__parse_index(self):
self.db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD)
g = self.db._parse_index_definition('fieldname')
self.assertIsInstance(g, query_parser.ExtractField)
self.assertEqual(['fieldname'], g.field)
def test__update_indexes(self):
self.db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD)
g = self.db._parse_index_definition('fieldname')
c = self.db._get_sqlite_handle().cursor()
self.db._update_indexes('doc-id', {'fieldname': 'val'},
[('fieldname', g)], c)
c.execute('SELECT doc_id, field_name, value FROM document_fields')
self.assertEqual([('doc-id', 'fieldname', 'val')],
c.fetchall())
def test__set_replica_uid(self):
# Start from scratch, so that replica_uid isn't set.
self.db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD)
self.assertIsNot(None, self.db._real_replica_uid)
self.assertIsNot(None, self.db._replica_uid)
self.db._set_replica_uid('foo')
c = self.db._get_sqlite_handle().cursor()
c.execute("SELECT value FROM u1db_config WHERE name='replica_uid'")
self.assertEqual(('foo',), c.fetchone())
self.assertEqual('foo', self.db._real_replica_uid)
self.assertEqual('foo', self.db._replica_uid)
self.db._close_sqlite_handle()
self.assertEqual('foo', self.db._replica_uid)
def test__open_database(self):
temp_dir = self.createTempDir(prefix='u1db-test-')
path = temp_dir + '/test.sqlite'
sqlite_backend.SQLCipherDatabase(path, PASSWORD)
db2 = sqlite_backend.SQLCipherDatabase._open_database(path, PASSWORD)
self.assertIsInstance(db2, sqlite_backend.SQLCipherDatabase)
def test__open_database_with_factory(self):
temp_dir = self.createTempDir(prefix='u1db-test-')
path = temp_dir + '/test.sqlite'
sqlite_backend.SQLCipherDatabase(path, PASSWORD)
db2 = sqlite_backend.SQLCipherDatabase._open_database(
path, PASSWORD, document_factory=TestAlternativeDocument)
self.assertEqual(TestAlternativeDocument, db2._factory)
def test_open_database_existing(self):
temp_dir = self.createTempDir(prefix='u1db-test-')
path = temp_dir + '/existing.sqlite'
sqlite_backend.SQLCipherDatabase(path, PASSWORD)
db2 = sqlite_backend.SQLCipherDatabase.open_database(path, PASSWORD, create=False)
self.assertIsInstance(db2, sqlite_backend.SQLCipherDatabase)
def test_open_database_with_factory(self):
temp_dir = self.createTempDir(prefix='u1db-test-')
path = temp_dir + '/existing.sqlite'
sqlite_backend.SQLCipherDatabase(path, PASSWORD)
db2 = sqlite_backend.SQLCipherDatabase.open_database(
path, PASSWORD, create=False, document_factory=TestAlternativeDocument)
self.assertEqual(TestAlternativeDocument, db2._factory)
def test_create_database_initializes_schema(self):
raw_db = self.db._get_sqlite_handle()
c = raw_db.cursor()
c.execute("SELECT * FROM u1db_config")
config = dict([(r[0], r[1]) for r in c.fetchall()])
self.assertEqual({'sql_schema': '0', 'replica_uid': 'test',
'index_storage': 'expand referenced encrypted'}, config)
|