diff options
Diffstat (limited to 'u1db/tests/test_c_backend.py')
-rw-r--r-- | u1db/tests/test_c_backend.py | 634 |
1 files changed, 634 insertions, 0 deletions
diff --git a/u1db/tests/test_c_backend.py b/u1db/tests/test_c_backend.py new file mode 100644 index 00000000..bdd2aec7 --- /dev/null +++ b/u1db/tests/test_c_backend.py @@ -0,0 +1,634 @@ +# Copyright 2011-2012 Canonical Ltd. +# +# This file is part of u1db. +# +# u1db is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# u1db is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with u1db. If not, see <http://www.gnu.org/licenses/>. + +try: + import simplejson as json +except ImportError: + import json # noqa +from u1db import ( + Document, + errors, + tests, + ) +from u1db.tests import c_backend_wrapper, c_backend_error +from u1db.tests.test_remote_sync_target import ( + make_http_app, + make_oauth_http_app + ) + + +class TestCDatabaseExists(tests.TestCase): + + def test_c_backend_compiled(self): + if c_backend_wrapper is None: + self.fail("Could not import the c_backend_wrapper module." + " Was it compiled properly?\n%s" % (c_backend_error,)) + + +# Rather than lots of failing tests, we have the above check to test that the +# module exists, and all these tests just get skipped +class BackendTests(tests.TestCase): + + def setUp(self): + super(BackendTests, self).setUp() + if c_backend_wrapper is None: + self.skipTest("The c_backend_wrapper could not be imported") + + +class TestCDatabase(BackendTests): + + def test_exists(self): + if c_backend_wrapper is None: + self.fail("Could not import the c_backend_wrapper module." + " Was it compiled properly?") + db = c_backend_wrapper.CDatabase(':memory:') + self.assertEqual(':memory:', db._filename) + + def test__is_closed(self): + db = c_backend_wrapper.CDatabase(':memory:') + self.assertTrue(db._sql_is_open()) + db.close() + self.assertFalse(db._sql_is_open()) + + def test__run_sql(self): + db = c_backend_wrapper.CDatabase(':memory:') + self.assertTrue(db._sql_is_open()) + self.assertEqual([], db._run_sql('CREATE TABLE test (id INTEGER)')) + self.assertEqual([], db._run_sql('INSERT INTO test VALUES (1)')) + self.assertEqual([('1',)], db._run_sql('SELECT * FROM test')) + + def test__get_generation(self): + db = c_backend_wrapper.CDatabase(':memory:') + self.assertEqual(0, db._get_generation()) + db.create_doc_from_json(tests.simple_doc) + self.assertEqual(1, db._get_generation()) + + def test__get_generation_info(self): + db = c_backend_wrapper.CDatabase(':memory:') + self.assertEqual((0, ''), db._get_generation_info()) + db.create_doc_from_json(tests.simple_doc) + info = db._get_generation_info() + self.assertEqual(1, info[0]) + self.assertTrue(info[1].startswith('T-')) + + def test__set_replica_uid(self): + db = c_backend_wrapper.CDatabase(':memory:') + self.assertIsNot(None, db._replica_uid) + db._set_replica_uid('foo') + self.assertEqual([('foo',)], db._run_sql( + "SELECT value FROM u1db_config WHERE name='replica_uid'")) + + def test_default_replica_uid(self): + self.db = c_backend_wrapper.CDatabase(':memory:') + self.assertIsNot(None, self.db._replica_uid) + self.assertEqual(32, len(self.db._replica_uid)) + # casting to an int from the uid *is* the check for correct behavior. + int(self.db._replica_uid, 16) + + def test_get_conflicts_with_borked_data(self): + self.db = c_backend_wrapper.CDatabase(':memory:') + # We add an entry to conflicts, but not to documents, which is an + # invalid situation + self.db._run_sql("INSERT INTO conflicts" + " VALUES ('doc-id', 'doc-rev', '{}')") + self.assertRaises(Exception, self.db.get_doc_conflicts, 'doc-id') + + def test_create_index_list(self): + # We manually poke data into the DB, so that we test just the "get_doc" + # code, rather than also testing the index management code. + self.db = c_backend_wrapper.CDatabase(':memory:') + doc = self.db.create_doc_from_json(tests.simple_doc) + self.db.create_index_list("key-idx", ["key"]) + docs = self.db.get_from_index('key-idx', 'value') + self.assertEqual([doc], docs) + + def test_create_index_list_on_non_ascii_field_name(self): + self.db = c_backend_wrapper.CDatabase(':memory:') + doc = self.db.create_doc_from_json(json.dumps({u'\xe5': 'value'})) + self.db.create_index_list('test-idx', [u'\xe5']) + self.assertEqual([doc], self.db.get_from_index('test-idx', 'value')) + + def test_list_indexes_with_non_ascii_field_names(self): + self.db = c_backend_wrapper.CDatabase(':memory:') + self.db.create_index_list('test-idx', [u'\xe5']) + self.assertEqual( + [('test-idx', [u'\xe5'])], self.db.list_indexes()) + + def test_create_index_evaluates_it(self): + self.db = c_backend_wrapper.CDatabase(':memory:') + doc = self.db.create_doc_from_json(tests.simple_doc) + self.db.create_index_list('test-idx', ['key']) + self.assertEqual([doc], self.db.get_from_index('test-idx', 'value')) + + def test_wildcard_matches_unicode_value(self): + self.db = c_backend_wrapper.CDatabase(':memory:') + doc = self.db.create_doc_from_json(json.dumps({"key": u"valu\xe5"})) + self.db.create_index_list('test-idx', ['key']) + self.assertEqual([doc], self.db.get_from_index('test-idx', '*')) + + def test_create_index_fails_if_name_taken(self): + self.db = c_backend_wrapper.CDatabase(':memory:') + self.db.create_index_list('test-idx', ['key']) + self.assertRaises(errors.IndexNameTakenError, + self.db.create_index_list, + 'test-idx', ['stuff']) + + def test_create_index_does_not_fail_if_name_taken_with_same_index(self): + self.db = c_backend_wrapper.CDatabase(':memory:') + self.db.create_index_list('test-idx', ['key']) + self.db.create_index_list('test-idx', ['key']) + self.assertEqual([('test-idx', ['key'])], self.db.list_indexes()) + + def test_create_index_after_deleting_document(self): + self.db = c_backend_wrapper.CDatabase(':memory:') + doc = self.db.create_doc_from_json(tests.simple_doc) + doc2 = self.db.create_doc_from_json(tests.simple_doc) + self.db.delete_doc(doc2) + self.db.create_index_list('test-idx', ['key']) + self.assertEqual([doc], self.db.get_from_index('test-idx', 'value')) + + def test_get_from_index(self): + # We manually poke data into the DB, so that we test just the "get_doc" + # code, rather than also testing the index management code. + self.db = c_backend_wrapper.CDatabase(':memory:') + doc = self.db.create_doc_from_json(tests.simple_doc) + self.db.create_index("key-idx", "key") + docs = self.db.get_from_index('key-idx', 'value') + self.assertEqual([doc], docs) + + def test_get_from_index_list(self): + # We manually poke data into the DB, so that we test just the "get_doc" + # code, rather than also testing the index management code. + self.db = c_backend_wrapper.CDatabase(':memory:') + doc = self.db.create_doc_from_json(tests.simple_doc) + self.db.create_index("key-idx", "key") + docs = self.db.get_from_index_list('key-idx', ['value']) + self.assertEqual([doc], docs) + + def test_get_from_index_list_multi(self): + self.db = c_backend_wrapper.CDatabase(':memory:') + content = '{"key": "value", "key2": "value2"}' + doc = self.db.create_doc_from_json(content) + self.db.create_index('test-idx', 'key', 'key2') + self.assertEqual( + [doc], + self.db.get_from_index_list('test-idx', ['value', 'value2'])) + + def test_get_from_index_list_multi_ordered(self): + self.db = c_backend_wrapper.CDatabase(':memory:') + doc1 = self.db.create_doc_from_json( + '{"key": "value3", "key2": "value4"}') + doc2 = self.db.create_doc_from_json( + '{"key": "value2", "key2": "value3"}') + doc3 = self.db.create_doc_from_json( + '{"key": "value2", "key2": "value2"}') + doc4 = self.db.create_doc_from_json( + '{"key": "value1", "key2": "value1"}') + self.db.create_index('test-idx', 'key', 'key2') + self.assertEqual( + [doc4, doc3, doc2, doc1], + self.db.get_from_index_list('test-idx', ['v*', '*'])) + + def test_get_from_index_2(self): + self.db = c_backend_wrapper.CDatabase(':memory:') + doc = self.db.create_doc_from_json(tests.nested_doc) + self.db.create_index("multi-idx", "key", "sub.doc") + docs = self.db.get_from_index('multi-idx', 'value', 'underneath') + self.assertEqual([doc], docs) + + def test_get_index_keys(self): + self.db = c_backend_wrapper.CDatabase(':memory:') + self.db.create_doc_from_json(tests.simple_doc) + self.db.create_index("key-idx", "key") + keys = self.db.get_index_keys('key-idx') + self.assertEqual([("value",)], keys) + + def test__query_init_one_field(self): + self.db = c_backend_wrapper.CDatabase(':memory:') + self.db.create_index("key-idx", "key") + query = self.db._query_init("key-idx") + self.assertEqual("key-idx", query.index_name) + self.assertEqual(1, query.num_fields) + self.assertEqual(["key"], query.fields) + + def test__query_init_two_fields(self): + self.db = c_backend_wrapper.CDatabase(':memory:') + self.db.create_index("two-idx", "key", "key2") + query = self.db._query_init("two-idx") + self.assertEqual("two-idx", query.index_name) + self.assertEqual(2, query.num_fields) + self.assertEqual(["key", "key2"], query.fields) + + def assertFormatQueryEquals(self, expected, wildcards, fields): + val, w = c_backend_wrapper._format_query(fields) + self.assertEqual(expected, val) + self.assertEqual(wildcards, w) + + def test__format_query(self): + self.assertFormatQueryEquals( + "SELECT d0.doc_id FROM document_fields d0" + " WHERE d0.field_name = ? AND d0.value = ? ORDER BY d0.value", + [0], ["1"]) + self.assertFormatQueryEquals( + "SELECT d0.doc_id" + " FROM document_fields d0, document_fields d1" + " WHERE d0.field_name = ? AND d0.value = ?" + " AND d0.doc_id = d1.doc_id" + " AND d1.field_name = ? AND d1.value = ?" + " ORDER BY d0.value, d1.value", + [0, 0], ["1", "2"]) + self.assertFormatQueryEquals( + "SELECT d0.doc_id" + " FROM document_fields d0, document_fields d1, document_fields d2" + " WHERE d0.field_name = ? AND d0.value = ?" + " AND d0.doc_id = d1.doc_id" + " AND d1.field_name = ? AND d1.value = ?" + " AND d0.doc_id = d2.doc_id" + " AND d2.field_name = ? AND d2.value = ?" + " ORDER BY d0.value, d1.value, d2.value", + [0, 0, 0], ["1", "2", "3"]) + + def test__format_query_wildcard(self): + self.assertFormatQueryEquals( + "SELECT d0.doc_id FROM document_fields d0" + " WHERE d0.field_name = ? AND d0.value NOT NULL ORDER BY d0.value", + [1], ["*"]) + self.assertFormatQueryEquals( + "SELECT d0.doc_id" + " FROM document_fields d0, document_fields d1" + " WHERE d0.field_name = ? AND d0.value = ?" + " AND d0.doc_id = d1.doc_id" + " AND d1.field_name = ? AND d1.value NOT NULL" + " ORDER BY d0.value, d1.value", + [0, 1], ["1", "*"]) + + def test__format_query_glob(self): + self.assertFormatQueryEquals( + "SELECT d0.doc_id FROM document_fields d0" + " WHERE d0.field_name = ? AND d0.value GLOB ? ORDER BY d0.value", + [2], ["1*"]) + + +class TestCSyncTarget(BackendTests): + + def setUp(self): + super(TestCSyncTarget, self).setUp() + self.db = c_backend_wrapper.CDatabase(':memory:') + self.st = self.db.get_sync_target() + + def test_attached_to_db(self): + self.assertEqual( + self.db._replica_uid, self.st.get_sync_info("misc")[0]) + + def test_get_sync_exchange(self): + exc = self.st._get_sync_exchange("source-uid", 10) + self.assertIsNot(None, exc) + + def test_sync_exchange_insert_doc_from_source(self): + exc = self.st._get_sync_exchange("source-uid", 5) + doc = c_backend_wrapper.make_document('doc-id', 'replica:1', + tests.simple_doc) + self.assertEqual([], exc.get_seen_ids()) + exc.insert_doc_from_source(doc, 10, 'T-sid') + self.assertGetDoc(self.db, 'doc-id', 'replica:1', tests.simple_doc, + False) + self.assertEqual( + (10, 'T-sid'), self.db._get_replica_gen_and_trans_id('source-uid')) + self.assertEqual(['doc-id'], exc.get_seen_ids()) + + def test_sync_exchange_conflicted_doc(self): + doc = self.db.create_doc_from_json(tests.simple_doc) + exc = self.st._get_sync_exchange("source-uid", 5) + doc2 = c_backend_wrapper.make_document(doc.doc_id, 'replica:1', + tests.nested_doc) + self.assertEqual([], exc.get_seen_ids()) + # The insert should be rejected and the doc_id not considered 'seen' + exc.insert_doc_from_source(doc2, 10, 'T-sid') + self.assertGetDoc( + self.db, doc.doc_id, doc.rev, tests.simple_doc, False) + self.assertEqual([], exc.get_seen_ids()) + + def test_sync_exchange_find_doc_ids(self): + doc = self.db.create_doc_from_json(tests.simple_doc) + exc = self.st._get_sync_exchange("source-uid", 0) + self.assertEqual(0, exc.target_gen) + exc.find_doc_ids_to_return() + doc_id = exc.get_doc_ids_to_return()[0] + self.assertEqual( + (doc.doc_id, 1), doc_id[:-1]) + self.assertTrue(doc_id[-1].startswith('T-')) + self.assertEqual(1, exc.target_gen) + + def test_sync_exchange_find_doc_ids_not_including_recently_inserted(self): + doc1 = self.db.create_doc_from_json(tests.simple_doc) + doc2 = self.db.create_doc_from_json(tests.nested_doc) + exc = self.st._get_sync_exchange("source-uid", 0) + doc3 = c_backend_wrapper.make_document(doc1.doc_id, + doc1.rev + "|zreplica:2", tests.simple_doc) + exc.insert_doc_from_source(doc3, 10, 'T-sid') + exc.find_doc_ids_to_return() + self.assertEqual( + (doc2.doc_id, 2), exc.get_doc_ids_to_return()[0][:-1]) + self.assertEqual(3, exc.target_gen) + + def test_sync_exchange_return_docs(self): + returned = [] + + def return_doc_cb(doc, gen, trans_id): + returned.append((doc, gen, trans_id)) + + doc1 = self.db.create_doc_from_json(tests.simple_doc) + exc = self.st._get_sync_exchange("source-uid", 0) + exc.find_doc_ids_to_return() + exc.return_docs(return_doc_cb) + self.assertEqual((doc1, 1), returned[0][:-1]) + + def test_sync_exchange_doc_ids(self): + doc1 = self.db.create_doc_from_json(tests.simple_doc, doc_id='doc-1') + db2 = c_backend_wrapper.CDatabase(':memory:') + doc2 = db2.create_doc_from_json(tests.nested_doc, doc_id='doc-2') + returned = [] + + def return_doc_cb(doc, gen, trans_id): + returned.append((doc, gen, trans_id)) + + val = self.st.sync_exchange_doc_ids( + db2, [(doc2.doc_id, 1, 'T-sid')], 0, None, return_doc_cb) + last_trans_id = self.db._get_transaction_log()[-1][1] + self.assertEqual(2, self.db._get_generation()) + self.assertEqual((2, last_trans_id), val) + self.assertGetDoc(self.db, doc2.doc_id, doc2.rev, tests.nested_doc, + False) + self.assertEqual((doc1, 1), returned[0][:-1]) + + +class TestCHTTPSyncTarget(BackendTests): + + def test_format_sync_url(self): + target = c_backend_wrapper.create_http_sync_target("http://base_url") + self.assertEqual("http://base_url/sync-from/replica-uid", + c_backend_wrapper._format_sync_url(target, "replica-uid")) + + def test_format_sync_url_escapes(self): + # The base_url should not get munged (we assume it is already a + # properly formed URL), but the replica-uid should get properly escaped + target = c_backend_wrapper.create_http_sync_target( + "http://host/base%2Ctest/") + self.assertEqual("http://host/base%2Ctest/sync-from/replica%2Cuid", + c_backend_wrapper._format_sync_url(target, "replica,uid")) + + def test_format_refuses_non_http(self): + db = c_backend_wrapper.CDatabase(':memory:') + target = db.get_sync_target() + self.assertRaises(RuntimeError, + c_backend_wrapper._format_sync_url, target, 'replica,uid') + + def test_oauth_credentials(self): + target = c_backend_wrapper.create_oauth_http_sync_target( + "http://host/base%2Ctest/", + 'consumer-key', 'consumer-secret', 'token-key', 'token-secret') + auth = c_backend_wrapper._get_oauth_authorization(target, + "GET", "http://host/base%2Ctest/sync-from/abcd-efg") + self.assertIsNot(None, auth) + self.assertTrue(auth.startswith('Authorization: OAuth realm="", ')) + self.assertNotIn('http://host/base', auth) + self.assertIn('oauth_nonce="', auth) + self.assertIn('oauth_timestamp="', auth) + self.assertIn('oauth_consumer_key="consumer-key"', auth) + self.assertIn('oauth_signature_method="HMAC-SHA1"', auth) + self.assertIn('oauth_version="1.0"', auth) + self.assertIn('oauth_token="token-key"', auth) + self.assertIn('oauth_signature="', auth) + + +class TestSyncCtoHTTPViaC(tests.TestCaseWithServer): + + make_app_with_state = staticmethod(make_http_app) + + def setUp(self): + super(TestSyncCtoHTTPViaC, self).setUp() + if c_backend_wrapper is None: + self.skipTest("The c_backend_wrapper could not be imported") + self.startServer() + + def test_trivial_sync(self): + mem_db = self.request_state._create_database('test.db') + mem_doc = mem_db.create_doc_from_json(tests.nested_doc) + url = self.getURL('test.db') + target = c_backend_wrapper.create_http_sync_target(url) + db = c_backend_wrapper.CDatabase(':memory:') + doc = db.create_doc_from_json(tests.simple_doc) + c_backend_wrapper.sync_db_to_target(db, target) + self.assertGetDoc(mem_db, doc.doc_id, doc.rev, doc.get_json(), False) + self.assertGetDoc(db, mem_doc.doc_id, mem_doc.rev, mem_doc.get_json(), + False) + + def test_unavailable(self): + mem_db = self.request_state._create_database('test.db') + mem_db.create_doc_from_json(tests.nested_doc) + tries = [] + + def wrapper(instance, *args, **kwargs): + tries.append(None) + raise errors.Unavailable + + mem_db.whats_changed = wrapper + url = self.getURL('test.db') + target = c_backend_wrapper.create_http_sync_target(url) + db = c_backend_wrapper.CDatabase(':memory:') + db.create_doc_from_json(tests.simple_doc) + self.assertRaises( + errors.Unavailable, c_backend_wrapper.sync_db_to_target, db, + target) + self.assertEqual(5, len(tries)) + + def test_unavailable_then_available(self): + mem_db = self.request_state._create_database('test.db') + mem_doc = mem_db.create_doc_from_json(tests.nested_doc) + orig_whatschanged = mem_db.whats_changed + tries = [] + + def wrapper(instance, *args, **kwargs): + if len(tries) < 1: + tries.append(None) + raise errors.Unavailable + return orig_whatschanged(instance, *args, **kwargs) + + mem_db.whats_changed = wrapper + url = self.getURL('test.db') + target = c_backend_wrapper.create_http_sync_target(url) + db = c_backend_wrapper.CDatabase(':memory:') + doc = db.create_doc_from_json(tests.simple_doc) + c_backend_wrapper.sync_db_to_target(db, target) + self.assertEqual(1, len(tries)) + self.assertGetDoc(mem_db, doc.doc_id, doc.rev, doc.get_json(), False) + self.assertGetDoc(db, mem_doc.doc_id, mem_doc.rev, mem_doc.get_json(), + False) + + def test_db_sync(self): + mem_db = self.request_state._create_database('test.db') + mem_doc = mem_db.create_doc_from_json(tests.nested_doc) + url = self.getURL('test.db') + db = c_backend_wrapper.CDatabase(':memory:') + doc = db.create_doc_from_json(tests.simple_doc) + local_gen_before_sync = db.sync(url) + gen, _, changes = db.whats_changed(local_gen_before_sync) + self.assertEqual(1, len(changes)) + self.assertEqual(mem_doc.doc_id, changes[0][0]) + self.assertEqual(1, gen - local_gen_before_sync) + self.assertEqual(1, local_gen_before_sync) + self.assertGetDoc(mem_db, doc.doc_id, doc.rev, doc.get_json(), False) + self.assertGetDoc(db, mem_doc.doc_id, mem_doc.rev, mem_doc.get_json(), + False) + + +class TestSyncCtoOAuthHTTPViaC(tests.TestCaseWithServer): + + make_app_with_state = staticmethod(make_oauth_http_app) + + def setUp(self): + super(TestSyncCtoOAuthHTTPViaC, self).setUp() + if c_backend_wrapper is None: + self.skipTest("The c_backend_wrapper could not be imported") + self.startServer() + + def test_trivial_sync(self): + mem_db = self.request_state._create_database('test.db') + mem_doc = mem_db.create_doc_from_json(tests.nested_doc) + url = self.getURL('~/test.db') + target = c_backend_wrapper.create_oauth_http_sync_target(url, + tests.consumer1.key, tests.consumer1.secret, + tests.token1.key, tests.token1.secret) + db = c_backend_wrapper.CDatabase(':memory:') + doc = db.create_doc_from_json(tests.simple_doc) + c_backend_wrapper.sync_db_to_target(db, target) + self.assertGetDoc(mem_db, doc.doc_id, doc.rev, doc.get_json(), False) + self.assertGetDoc(db, mem_doc.doc_id, mem_doc.rev, mem_doc.get_json(), + False) + + +class TestVectorClock(BackendTests): + + def create_vcr(self, rev): + return c_backend_wrapper.VectorClockRev(rev) + + def test_parse_empty(self): + self.assertEqual('VectorClockRev()', + repr(self.create_vcr(''))) + + def test_parse_invalid(self): + self.assertEqual('VectorClockRev(None)', + repr(self.create_vcr('x'))) + self.assertEqual('VectorClockRev(None)', + repr(self.create_vcr('x:a'))) + self.assertEqual('VectorClockRev(None)', + repr(self.create_vcr('y:1|x:a'))) + self.assertEqual('VectorClockRev(None)', + repr(self.create_vcr('x:a|y:1'))) + self.assertEqual('VectorClockRev(None)', + repr(self.create_vcr('y:1|x:2a'))) + self.assertEqual('VectorClockRev(None)', + repr(self.create_vcr('y:1||'))) + self.assertEqual('VectorClockRev(None)', + repr(self.create_vcr('y:1|'))) + self.assertEqual('VectorClockRev(None)', + repr(self.create_vcr('y:1|x:2|'))) + self.assertEqual('VectorClockRev(None)', + repr(self.create_vcr('y:1|x:2|:'))) + self.assertEqual('VectorClockRev(None)', + repr(self.create_vcr('y:1|x:2|m:'))) + self.assertEqual('VectorClockRev(None)', + repr(self.create_vcr('y:1|x:|m:3'))) + self.assertEqual('VectorClockRev(None)', + repr(self.create_vcr('y:1|:|m:3'))) + + def test_parse_single(self): + self.assertEqual('VectorClockRev(test:1)', + repr(self.create_vcr('test:1'))) + + def test_parse_multi(self): + self.assertEqual('VectorClockRev(test:1|z:2)', + repr(self.create_vcr('test:1|z:2'))) + self.assertEqual('VectorClockRev(ab:1|bc:2|cd:3|de:4|ef:5)', + repr(self.create_vcr('ab:1|bc:2|cd:3|de:4|ef:5'))) + self.assertEqual('VectorClockRev(a:2|b:1)', + repr(self.create_vcr('b:1|a:2'))) + + +class TestCDocument(BackendTests): + + def make_document(self, *args, **kwargs): + return c_backend_wrapper.make_document(*args, **kwargs) + + def test_create(self): + self.make_document('doc-id', 'uid:1', tests.simple_doc) + + def assertPyDocEqualCDoc(self, *args, **kwargs): + cdoc = self.make_document(*args, **kwargs) + pydoc = Document(*args, **kwargs) + self.assertEqual(pydoc, cdoc) + self.assertEqual(cdoc, pydoc) + + def test_cmp_to_pydoc_equal(self): + self.assertPyDocEqualCDoc('doc-id', 'uid:1', tests.simple_doc) + self.assertPyDocEqualCDoc('doc-id', 'uid:1', tests.simple_doc, + has_conflicts=False) + self.assertPyDocEqualCDoc('doc-id', 'uid:1', tests.simple_doc, + has_conflicts=True) + + def test_cmp_to_pydoc_not_equal_conflicts(self): + cdoc = self.make_document('doc-id', 'uid:1', tests.simple_doc) + pydoc = Document('doc-id', 'uid:1', tests.simple_doc, + has_conflicts=True) + self.assertNotEqual(cdoc, pydoc) + self.assertNotEqual(pydoc, cdoc) + + def test_cmp_to_pydoc_not_equal_doc_id(self): + cdoc = self.make_document('doc-id', 'uid:1', tests.simple_doc) + pydoc = Document('doc2-id', 'uid:1', tests.simple_doc) + self.assertNotEqual(cdoc, pydoc) + self.assertNotEqual(pydoc, cdoc) + + def test_cmp_to_pydoc_not_equal_doc_rev(self): + cdoc = self.make_document('doc-id', 'uid:1', tests.simple_doc) + pydoc = Document('doc-id', 'uid:2', tests.simple_doc) + self.assertNotEqual(cdoc, pydoc) + self.assertNotEqual(pydoc, cdoc) + + def test_cmp_to_pydoc_not_equal_content(self): + cdoc = self.make_document('doc-id', 'uid:1', tests.simple_doc) + pydoc = Document('doc-id', 'uid:1', tests.nested_doc) + self.assertNotEqual(cdoc, pydoc) + self.assertNotEqual(pydoc, cdoc) + + +class TestUUID(BackendTests): + + def test_uuid4_conformance(self): + uuids = set() + for i in range(20): + uuid = c_backend_wrapper.generate_hex_uuid() + self.assertIsInstance(uuid, str) + self.assertEqual(32, len(uuid)) + # This will raise ValueError if it isn't a valid hex string + long(uuid, 16) + # Version 4 uuids have 2 other requirements, the high 4 bits of the + # seventh byte are always '0x4', and the middle bits of byte 9 are + # always set + self.assertEqual('4', uuid[12]) + self.assertTrue(uuid[16] in '89ab') + self.assertTrue(uuid not in uuids) + uuids.add(uuid) |