summaryrefslogtreecommitdiff
path: root/u1db/tests/test_c_backend.py
diff options
context:
space:
mode:
Diffstat (limited to 'u1db/tests/test_c_backend.py')
-rw-r--r--u1db/tests/test_c_backend.py634
1 files changed, 0 insertions, 634 deletions
diff --git a/u1db/tests/test_c_backend.py b/u1db/tests/test_c_backend.py
deleted file mode 100644
index bdd2aec7..00000000
--- a/u1db/tests/test_c_backend.py
+++ /dev/null
@@ -1,634 +0,0 @@
-# Copyright 2011-2012 Canonical Ltd.
-#
-# This file is part of u1db.
-#
-# u1db is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation.
-#
-# u1db is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with u1db. If not, see <http://www.gnu.org/licenses/>.
-
-try:
- import simplejson as json
-except ImportError:
- import json # noqa
-from u1db import (
- Document,
- errors,
- tests,
- )
-from u1db.tests import c_backend_wrapper, c_backend_error
-from u1db.tests.test_remote_sync_target import (
- make_http_app,
- make_oauth_http_app
- )
-
-
-class TestCDatabaseExists(tests.TestCase):
-
- def test_c_backend_compiled(self):
- if c_backend_wrapper is None:
- self.fail("Could not import the c_backend_wrapper module."
- " Was it compiled properly?\n%s" % (c_backend_error,))
-
-
-# Rather than lots of failing tests, we have the above check to test that the
-# module exists, and all these tests just get skipped
-class BackendTests(tests.TestCase):
-
- def setUp(self):
- super(BackendTests, self).setUp()
- if c_backend_wrapper is None:
- self.skipTest("The c_backend_wrapper could not be imported")
-
-
-class TestCDatabase(BackendTests):
-
- def test_exists(self):
- if c_backend_wrapper is None:
- self.fail("Could not import the c_backend_wrapper module."
- " Was it compiled properly?")
- db = c_backend_wrapper.CDatabase(':memory:')
- self.assertEqual(':memory:', db._filename)
-
- def test__is_closed(self):
- db = c_backend_wrapper.CDatabase(':memory:')
- self.assertTrue(db._sql_is_open())
- db.close()
- self.assertFalse(db._sql_is_open())
-
- def test__run_sql(self):
- db = c_backend_wrapper.CDatabase(':memory:')
- self.assertTrue(db._sql_is_open())
- self.assertEqual([], db._run_sql('CREATE TABLE test (id INTEGER)'))
- self.assertEqual([], db._run_sql('INSERT INTO test VALUES (1)'))
- self.assertEqual([('1',)], db._run_sql('SELECT * FROM test'))
-
- def test__get_generation(self):
- db = c_backend_wrapper.CDatabase(':memory:')
- self.assertEqual(0, db._get_generation())
- db.create_doc_from_json(tests.simple_doc)
- self.assertEqual(1, db._get_generation())
-
- def test__get_generation_info(self):
- db = c_backend_wrapper.CDatabase(':memory:')
- self.assertEqual((0, ''), db._get_generation_info())
- db.create_doc_from_json(tests.simple_doc)
- info = db._get_generation_info()
- self.assertEqual(1, info[0])
- self.assertTrue(info[1].startswith('T-'))
-
- def test__set_replica_uid(self):
- db = c_backend_wrapper.CDatabase(':memory:')
- self.assertIsNot(None, db._replica_uid)
- db._set_replica_uid('foo')
- self.assertEqual([('foo',)], db._run_sql(
- "SELECT value FROM u1db_config WHERE name='replica_uid'"))
-
- def test_default_replica_uid(self):
- self.db = c_backend_wrapper.CDatabase(':memory:')
- self.assertIsNot(None, self.db._replica_uid)
- self.assertEqual(32, len(self.db._replica_uid))
- # casting to an int from the uid *is* the check for correct behavior.
- int(self.db._replica_uid, 16)
-
- def test_get_conflicts_with_borked_data(self):
- self.db = c_backend_wrapper.CDatabase(':memory:')
- # We add an entry to conflicts, but not to documents, which is an
- # invalid situation
- self.db._run_sql("INSERT INTO conflicts"
- " VALUES ('doc-id', 'doc-rev', '{}')")
- self.assertRaises(Exception, self.db.get_doc_conflicts, 'doc-id')
-
- def test_create_index_list(self):
- # We manually poke data into the DB, so that we test just the "get_doc"
- # code, rather than also testing the index management code.
- self.db = c_backend_wrapper.CDatabase(':memory:')
- doc = self.db.create_doc_from_json(tests.simple_doc)
- self.db.create_index_list("key-idx", ["key"])
- docs = self.db.get_from_index('key-idx', 'value')
- self.assertEqual([doc], docs)
-
- def test_create_index_list_on_non_ascii_field_name(self):
- self.db = c_backend_wrapper.CDatabase(':memory:')
- doc = self.db.create_doc_from_json(json.dumps({u'\xe5': 'value'}))
- self.db.create_index_list('test-idx', [u'\xe5'])
- self.assertEqual([doc], self.db.get_from_index('test-idx', 'value'))
-
- def test_list_indexes_with_non_ascii_field_names(self):
- self.db = c_backend_wrapper.CDatabase(':memory:')
- self.db.create_index_list('test-idx', [u'\xe5'])
- self.assertEqual(
- [('test-idx', [u'\xe5'])], self.db.list_indexes())
-
- def test_create_index_evaluates_it(self):
- self.db = c_backend_wrapper.CDatabase(':memory:')
- doc = self.db.create_doc_from_json(tests.simple_doc)
- self.db.create_index_list('test-idx', ['key'])
- self.assertEqual([doc], self.db.get_from_index('test-idx', 'value'))
-
- def test_wildcard_matches_unicode_value(self):
- self.db = c_backend_wrapper.CDatabase(':memory:')
- doc = self.db.create_doc_from_json(json.dumps({"key": u"valu\xe5"}))
- self.db.create_index_list('test-idx', ['key'])
- self.assertEqual([doc], self.db.get_from_index('test-idx', '*'))
-
- def test_create_index_fails_if_name_taken(self):
- self.db = c_backend_wrapper.CDatabase(':memory:')
- self.db.create_index_list('test-idx', ['key'])
- self.assertRaises(errors.IndexNameTakenError,
- self.db.create_index_list,
- 'test-idx', ['stuff'])
-
- def test_create_index_does_not_fail_if_name_taken_with_same_index(self):
- self.db = c_backend_wrapper.CDatabase(':memory:')
- self.db.create_index_list('test-idx', ['key'])
- self.db.create_index_list('test-idx', ['key'])
- self.assertEqual([('test-idx', ['key'])], self.db.list_indexes())
-
- def test_create_index_after_deleting_document(self):
- self.db = c_backend_wrapper.CDatabase(':memory:')
- doc = self.db.create_doc_from_json(tests.simple_doc)
- doc2 = self.db.create_doc_from_json(tests.simple_doc)
- self.db.delete_doc(doc2)
- self.db.create_index_list('test-idx', ['key'])
- self.assertEqual([doc], self.db.get_from_index('test-idx', 'value'))
-
- def test_get_from_index(self):
- # We manually poke data into the DB, so that we test just the "get_doc"
- # code, rather than also testing the index management code.
- self.db = c_backend_wrapper.CDatabase(':memory:')
- doc = self.db.create_doc_from_json(tests.simple_doc)
- self.db.create_index("key-idx", "key")
- docs = self.db.get_from_index('key-idx', 'value')
- self.assertEqual([doc], docs)
-
- def test_get_from_index_list(self):
- # We manually poke data into the DB, so that we test just the "get_doc"
- # code, rather than also testing the index management code.
- self.db = c_backend_wrapper.CDatabase(':memory:')
- doc = self.db.create_doc_from_json(tests.simple_doc)
- self.db.create_index("key-idx", "key")
- docs = self.db.get_from_index_list('key-idx', ['value'])
- self.assertEqual([doc], docs)
-
- def test_get_from_index_list_multi(self):
- self.db = c_backend_wrapper.CDatabase(':memory:')
- content = '{"key": "value", "key2": "value2"}'
- doc = self.db.create_doc_from_json(content)
- self.db.create_index('test-idx', 'key', 'key2')
- self.assertEqual(
- [doc],
- self.db.get_from_index_list('test-idx', ['value', 'value2']))
-
- def test_get_from_index_list_multi_ordered(self):
- self.db = c_backend_wrapper.CDatabase(':memory:')
- doc1 = self.db.create_doc_from_json(
- '{"key": "value3", "key2": "value4"}')
- doc2 = self.db.create_doc_from_json(
- '{"key": "value2", "key2": "value3"}')
- doc3 = self.db.create_doc_from_json(
- '{"key": "value2", "key2": "value2"}')
- doc4 = self.db.create_doc_from_json(
- '{"key": "value1", "key2": "value1"}')
- self.db.create_index('test-idx', 'key', 'key2')
- self.assertEqual(
- [doc4, doc3, doc2, doc1],
- self.db.get_from_index_list('test-idx', ['v*', '*']))
-
- def test_get_from_index_2(self):
- self.db = c_backend_wrapper.CDatabase(':memory:')
- doc = self.db.create_doc_from_json(tests.nested_doc)
- self.db.create_index("multi-idx", "key", "sub.doc")
- docs = self.db.get_from_index('multi-idx', 'value', 'underneath')
- self.assertEqual([doc], docs)
-
- def test_get_index_keys(self):
- self.db = c_backend_wrapper.CDatabase(':memory:')
- self.db.create_doc_from_json(tests.simple_doc)
- self.db.create_index("key-idx", "key")
- keys = self.db.get_index_keys('key-idx')
- self.assertEqual([("value",)], keys)
-
- def test__query_init_one_field(self):
- self.db = c_backend_wrapper.CDatabase(':memory:')
- self.db.create_index("key-idx", "key")
- query = self.db._query_init("key-idx")
- self.assertEqual("key-idx", query.index_name)
- self.assertEqual(1, query.num_fields)
- self.assertEqual(["key"], query.fields)
-
- def test__query_init_two_fields(self):
- self.db = c_backend_wrapper.CDatabase(':memory:')
- self.db.create_index("two-idx", "key", "key2")
- query = self.db._query_init("two-idx")
- self.assertEqual("two-idx", query.index_name)
- self.assertEqual(2, query.num_fields)
- self.assertEqual(["key", "key2"], query.fields)
-
- def assertFormatQueryEquals(self, expected, wildcards, fields):
- val, w = c_backend_wrapper._format_query(fields)
- self.assertEqual(expected, val)
- self.assertEqual(wildcards, w)
-
- def test__format_query(self):
- self.assertFormatQueryEquals(
- "SELECT d0.doc_id FROM document_fields d0"
- " WHERE d0.field_name = ? AND d0.value = ? ORDER BY d0.value",
- [0], ["1"])
- self.assertFormatQueryEquals(
- "SELECT d0.doc_id"
- " FROM document_fields d0, document_fields d1"
- " WHERE d0.field_name = ? AND d0.value = ?"
- " AND d0.doc_id = d1.doc_id"
- " AND d1.field_name = ? AND d1.value = ?"
- " ORDER BY d0.value, d1.value",
- [0, 0], ["1", "2"])
- self.assertFormatQueryEquals(
- "SELECT d0.doc_id"
- " FROM document_fields d0, document_fields d1, document_fields d2"
- " WHERE d0.field_name = ? AND d0.value = ?"
- " AND d0.doc_id = d1.doc_id"
- " AND d1.field_name = ? AND d1.value = ?"
- " AND d0.doc_id = d2.doc_id"
- " AND d2.field_name = ? AND d2.value = ?"
- " ORDER BY d0.value, d1.value, d2.value",
- [0, 0, 0], ["1", "2", "3"])
-
- def test__format_query_wildcard(self):
- self.assertFormatQueryEquals(
- "SELECT d0.doc_id FROM document_fields d0"
- " WHERE d0.field_name = ? AND d0.value NOT NULL ORDER BY d0.value",
- [1], ["*"])
- self.assertFormatQueryEquals(
- "SELECT d0.doc_id"
- " FROM document_fields d0, document_fields d1"
- " WHERE d0.field_name = ? AND d0.value = ?"
- " AND d0.doc_id = d1.doc_id"
- " AND d1.field_name = ? AND d1.value NOT NULL"
- " ORDER BY d0.value, d1.value",
- [0, 1], ["1", "*"])
-
- def test__format_query_glob(self):
- self.assertFormatQueryEquals(
- "SELECT d0.doc_id FROM document_fields d0"
- " WHERE d0.field_name = ? AND d0.value GLOB ? ORDER BY d0.value",
- [2], ["1*"])
-
-
-class TestCSyncTarget(BackendTests):
-
- def setUp(self):
- super(TestCSyncTarget, self).setUp()
- self.db = c_backend_wrapper.CDatabase(':memory:')
- self.st = self.db.get_sync_target()
-
- def test_attached_to_db(self):
- self.assertEqual(
- self.db._replica_uid, self.st.get_sync_info("misc")[0])
-
- def test_get_sync_exchange(self):
- exc = self.st._get_sync_exchange("source-uid", 10)
- self.assertIsNot(None, exc)
-
- def test_sync_exchange_insert_doc_from_source(self):
- exc = self.st._get_sync_exchange("source-uid", 5)
- doc = c_backend_wrapper.make_document('doc-id', 'replica:1',
- tests.simple_doc)
- self.assertEqual([], exc.get_seen_ids())
- exc.insert_doc_from_source(doc, 10, 'T-sid')
- self.assertGetDoc(self.db, 'doc-id', 'replica:1', tests.simple_doc,
- False)
- self.assertEqual(
- (10, 'T-sid'), self.db._get_replica_gen_and_trans_id('source-uid'))
- self.assertEqual(['doc-id'], exc.get_seen_ids())
-
- def test_sync_exchange_conflicted_doc(self):
- doc = self.db.create_doc_from_json(tests.simple_doc)
- exc = self.st._get_sync_exchange("source-uid", 5)
- doc2 = c_backend_wrapper.make_document(doc.doc_id, 'replica:1',
- tests.nested_doc)
- self.assertEqual([], exc.get_seen_ids())
- # The insert should be rejected and the doc_id not considered 'seen'
- exc.insert_doc_from_source(doc2, 10, 'T-sid')
- self.assertGetDoc(
- self.db, doc.doc_id, doc.rev, tests.simple_doc, False)
- self.assertEqual([], exc.get_seen_ids())
-
- def test_sync_exchange_find_doc_ids(self):
- doc = self.db.create_doc_from_json(tests.simple_doc)
- exc = self.st._get_sync_exchange("source-uid", 0)
- self.assertEqual(0, exc.target_gen)
- exc.find_doc_ids_to_return()
- doc_id = exc.get_doc_ids_to_return()[0]
- self.assertEqual(
- (doc.doc_id, 1), doc_id[:-1])
- self.assertTrue(doc_id[-1].startswith('T-'))
- self.assertEqual(1, exc.target_gen)
-
- def test_sync_exchange_find_doc_ids_not_including_recently_inserted(self):
- doc1 = self.db.create_doc_from_json(tests.simple_doc)
- doc2 = self.db.create_doc_from_json(tests.nested_doc)
- exc = self.st._get_sync_exchange("source-uid", 0)
- doc3 = c_backend_wrapper.make_document(doc1.doc_id,
- doc1.rev + "|zreplica:2", tests.simple_doc)
- exc.insert_doc_from_source(doc3, 10, 'T-sid')
- exc.find_doc_ids_to_return()
- self.assertEqual(
- (doc2.doc_id, 2), exc.get_doc_ids_to_return()[0][:-1])
- self.assertEqual(3, exc.target_gen)
-
- def test_sync_exchange_return_docs(self):
- returned = []
-
- def return_doc_cb(doc, gen, trans_id):
- returned.append((doc, gen, trans_id))
-
- doc1 = self.db.create_doc_from_json(tests.simple_doc)
- exc = self.st._get_sync_exchange("source-uid", 0)
- exc.find_doc_ids_to_return()
- exc.return_docs(return_doc_cb)
- self.assertEqual((doc1, 1), returned[0][:-1])
-
- def test_sync_exchange_doc_ids(self):
- doc1 = self.db.create_doc_from_json(tests.simple_doc, doc_id='doc-1')
- db2 = c_backend_wrapper.CDatabase(':memory:')
- doc2 = db2.create_doc_from_json(tests.nested_doc, doc_id='doc-2')
- returned = []
-
- def return_doc_cb(doc, gen, trans_id):
- returned.append((doc, gen, trans_id))
-
- val = self.st.sync_exchange_doc_ids(
- db2, [(doc2.doc_id, 1, 'T-sid')], 0, None, return_doc_cb)
- last_trans_id = self.db._get_transaction_log()[-1][1]
- self.assertEqual(2, self.db._get_generation())
- self.assertEqual((2, last_trans_id), val)
- self.assertGetDoc(self.db, doc2.doc_id, doc2.rev, tests.nested_doc,
- False)
- self.assertEqual((doc1, 1), returned[0][:-1])
-
-
-class TestCHTTPSyncTarget(BackendTests):
-
- def test_format_sync_url(self):
- target = c_backend_wrapper.create_http_sync_target("http://base_url")
- self.assertEqual("http://base_url/sync-from/replica-uid",
- c_backend_wrapper._format_sync_url(target, "replica-uid"))
-
- def test_format_sync_url_escapes(self):
- # The base_url should not get munged (we assume it is already a
- # properly formed URL), but the replica-uid should get properly escaped
- target = c_backend_wrapper.create_http_sync_target(
- "http://host/base%2Ctest/")
- self.assertEqual("http://host/base%2Ctest/sync-from/replica%2Cuid",
- c_backend_wrapper._format_sync_url(target, "replica,uid"))
-
- def test_format_refuses_non_http(self):
- db = c_backend_wrapper.CDatabase(':memory:')
- target = db.get_sync_target()
- self.assertRaises(RuntimeError,
- c_backend_wrapper._format_sync_url, target, 'replica,uid')
-
- def test_oauth_credentials(self):
- target = c_backend_wrapper.create_oauth_http_sync_target(
- "http://host/base%2Ctest/",
- 'consumer-key', 'consumer-secret', 'token-key', 'token-secret')
- auth = c_backend_wrapper._get_oauth_authorization(target,
- "GET", "http://host/base%2Ctest/sync-from/abcd-efg")
- self.assertIsNot(None, auth)
- self.assertTrue(auth.startswith('Authorization: OAuth realm="", '))
- self.assertNotIn('http://host/base', auth)
- self.assertIn('oauth_nonce="', auth)
- self.assertIn('oauth_timestamp="', auth)
- self.assertIn('oauth_consumer_key="consumer-key"', auth)
- self.assertIn('oauth_signature_method="HMAC-SHA1"', auth)
- self.assertIn('oauth_version="1.0"', auth)
- self.assertIn('oauth_token="token-key"', auth)
- self.assertIn('oauth_signature="', auth)
-
-
-class TestSyncCtoHTTPViaC(tests.TestCaseWithServer):
-
- make_app_with_state = staticmethod(make_http_app)
-
- def setUp(self):
- super(TestSyncCtoHTTPViaC, self).setUp()
- if c_backend_wrapper is None:
- self.skipTest("The c_backend_wrapper could not be imported")
- self.startServer()
-
- def test_trivial_sync(self):
- mem_db = self.request_state._create_database('test.db')
- mem_doc = mem_db.create_doc_from_json(tests.nested_doc)
- url = self.getURL('test.db')
- target = c_backend_wrapper.create_http_sync_target(url)
- db = c_backend_wrapper.CDatabase(':memory:')
- doc = db.create_doc_from_json(tests.simple_doc)
- c_backend_wrapper.sync_db_to_target(db, target)
- self.assertGetDoc(mem_db, doc.doc_id, doc.rev, doc.get_json(), False)
- self.assertGetDoc(db, mem_doc.doc_id, mem_doc.rev, mem_doc.get_json(),
- False)
-
- def test_unavailable(self):
- mem_db = self.request_state._create_database('test.db')
- mem_db.create_doc_from_json(tests.nested_doc)
- tries = []
-
- def wrapper(instance, *args, **kwargs):
- tries.append(None)
- raise errors.Unavailable
-
- mem_db.whats_changed = wrapper
- url = self.getURL('test.db')
- target = c_backend_wrapper.create_http_sync_target(url)
- db = c_backend_wrapper.CDatabase(':memory:')
- db.create_doc_from_json(tests.simple_doc)
- self.assertRaises(
- errors.Unavailable, c_backend_wrapper.sync_db_to_target, db,
- target)
- self.assertEqual(5, len(tries))
-
- def test_unavailable_then_available(self):
- mem_db = self.request_state._create_database('test.db')
- mem_doc = mem_db.create_doc_from_json(tests.nested_doc)
- orig_whatschanged = mem_db.whats_changed
- tries = []
-
- def wrapper(instance, *args, **kwargs):
- if len(tries) < 1:
- tries.append(None)
- raise errors.Unavailable
- return orig_whatschanged(instance, *args, **kwargs)
-
- mem_db.whats_changed = wrapper
- url = self.getURL('test.db')
- target = c_backend_wrapper.create_http_sync_target(url)
- db = c_backend_wrapper.CDatabase(':memory:')
- doc = db.create_doc_from_json(tests.simple_doc)
- c_backend_wrapper.sync_db_to_target(db, target)
- self.assertEqual(1, len(tries))
- self.assertGetDoc(mem_db, doc.doc_id, doc.rev, doc.get_json(), False)
- self.assertGetDoc(db, mem_doc.doc_id, mem_doc.rev, mem_doc.get_json(),
- False)
-
- def test_db_sync(self):
- mem_db = self.request_state._create_database('test.db')
- mem_doc = mem_db.create_doc_from_json(tests.nested_doc)
- url = self.getURL('test.db')
- db = c_backend_wrapper.CDatabase(':memory:')
- doc = db.create_doc_from_json(tests.simple_doc)
- local_gen_before_sync = db.sync(url)
- gen, _, changes = db.whats_changed(local_gen_before_sync)
- self.assertEqual(1, len(changes))
- self.assertEqual(mem_doc.doc_id, changes[0][0])
- self.assertEqual(1, gen - local_gen_before_sync)
- self.assertEqual(1, local_gen_before_sync)
- self.assertGetDoc(mem_db, doc.doc_id, doc.rev, doc.get_json(), False)
- self.assertGetDoc(db, mem_doc.doc_id, mem_doc.rev, mem_doc.get_json(),
- False)
-
-
-class TestSyncCtoOAuthHTTPViaC(tests.TestCaseWithServer):
-
- make_app_with_state = staticmethod(make_oauth_http_app)
-
- def setUp(self):
- super(TestSyncCtoOAuthHTTPViaC, self).setUp()
- if c_backend_wrapper is None:
- self.skipTest("The c_backend_wrapper could not be imported")
- self.startServer()
-
- def test_trivial_sync(self):
- mem_db = self.request_state._create_database('test.db')
- mem_doc = mem_db.create_doc_from_json(tests.nested_doc)
- url = self.getURL('~/test.db')
- target = c_backend_wrapper.create_oauth_http_sync_target(url,
- tests.consumer1.key, tests.consumer1.secret,
- tests.token1.key, tests.token1.secret)
- db = c_backend_wrapper.CDatabase(':memory:')
- doc = db.create_doc_from_json(tests.simple_doc)
- c_backend_wrapper.sync_db_to_target(db, target)
- self.assertGetDoc(mem_db, doc.doc_id, doc.rev, doc.get_json(), False)
- self.assertGetDoc(db, mem_doc.doc_id, mem_doc.rev, mem_doc.get_json(),
- False)
-
-
-class TestVectorClock(BackendTests):
-
- def create_vcr(self, rev):
- return c_backend_wrapper.VectorClockRev(rev)
-
- def test_parse_empty(self):
- self.assertEqual('VectorClockRev()',
- repr(self.create_vcr('')))
-
- def test_parse_invalid(self):
- self.assertEqual('VectorClockRev(None)',
- repr(self.create_vcr('x')))
- self.assertEqual('VectorClockRev(None)',
- repr(self.create_vcr('x:a')))
- self.assertEqual('VectorClockRev(None)',
- repr(self.create_vcr('y:1|x:a')))
- self.assertEqual('VectorClockRev(None)',
- repr(self.create_vcr('x:a|y:1')))
- self.assertEqual('VectorClockRev(None)',
- repr(self.create_vcr('y:1|x:2a')))
- self.assertEqual('VectorClockRev(None)',
- repr(self.create_vcr('y:1||')))
- self.assertEqual('VectorClockRev(None)',
- repr(self.create_vcr('y:1|')))
- self.assertEqual('VectorClockRev(None)',
- repr(self.create_vcr('y:1|x:2|')))
- self.assertEqual('VectorClockRev(None)',
- repr(self.create_vcr('y:1|x:2|:')))
- self.assertEqual('VectorClockRev(None)',
- repr(self.create_vcr('y:1|x:2|m:')))
- self.assertEqual('VectorClockRev(None)',
- repr(self.create_vcr('y:1|x:|m:3')))
- self.assertEqual('VectorClockRev(None)',
- repr(self.create_vcr('y:1|:|m:3')))
-
- def test_parse_single(self):
- self.assertEqual('VectorClockRev(test:1)',
- repr(self.create_vcr('test:1')))
-
- def test_parse_multi(self):
- self.assertEqual('VectorClockRev(test:1|z:2)',
- repr(self.create_vcr('test:1|z:2')))
- self.assertEqual('VectorClockRev(ab:1|bc:2|cd:3|de:4|ef:5)',
- repr(self.create_vcr('ab:1|bc:2|cd:3|de:4|ef:5')))
- self.assertEqual('VectorClockRev(a:2|b:1)',
- repr(self.create_vcr('b:1|a:2')))
-
-
-class TestCDocument(BackendTests):
-
- def make_document(self, *args, **kwargs):
- return c_backend_wrapper.make_document(*args, **kwargs)
-
- def test_create(self):
- self.make_document('doc-id', 'uid:1', tests.simple_doc)
-
- def assertPyDocEqualCDoc(self, *args, **kwargs):
- cdoc = self.make_document(*args, **kwargs)
- pydoc = Document(*args, **kwargs)
- self.assertEqual(pydoc, cdoc)
- self.assertEqual(cdoc, pydoc)
-
- def test_cmp_to_pydoc_equal(self):
- self.assertPyDocEqualCDoc('doc-id', 'uid:1', tests.simple_doc)
- self.assertPyDocEqualCDoc('doc-id', 'uid:1', tests.simple_doc,
- has_conflicts=False)
- self.assertPyDocEqualCDoc('doc-id', 'uid:1', tests.simple_doc,
- has_conflicts=True)
-
- def test_cmp_to_pydoc_not_equal_conflicts(self):
- cdoc = self.make_document('doc-id', 'uid:1', tests.simple_doc)
- pydoc = Document('doc-id', 'uid:1', tests.simple_doc,
- has_conflicts=True)
- self.assertNotEqual(cdoc, pydoc)
- self.assertNotEqual(pydoc, cdoc)
-
- def test_cmp_to_pydoc_not_equal_doc_id(self):
- cdoc = self.make_document('doc-id', 'uid:1', tests.simple_doc)
- pydoc = Document('doc2-id', 'uid:1', tests.simple_doc)
- self.assertNotEqual(cdoc, pydoc)
- self.assertNotEqual(pydoc, cdoc)
-
- def test_cmp_to_pydoc_not_equal_doc_rev(self):
- cdoc = self.make_document('doc-id', 'uid:1', tests.simple_doc)
- pydoc = Document('doc-id', 'uid:2', tests.simple_doc)
- self.assertNotEqual(cdoc, pydoc)
- self.assertNotEqual(pydoc, cdoc)
-
- def test_cmp_to_pydoc_not_equal_content(self):
- cdoc = self.make_document('doc-id', 'uid:1', tests.simple_doc)
- pydoc = Document('doc-id', 'uid:1', tests.nested_doc)
- self.assertNotEqual(cdoc, pydoc)
- self.assertNotEqual(pydoc, cdoc)
-
-
-class TestUUID(BackendTests):
-
- def test_uuid4_conformance(self):
- uuids = set()
- for i in range(20):
- uuid = c_backend_wrapper.generate_hex_uuid()
- self.assertIsInstance(uuid, str)
- self.assertEqual(32, len(uuid))
- # This will raise ValueError if it isn't a valid hex string
- long(uuid, 16)
- # Version 4 uuids have 2 other requirements, the high 4 bits of the
- # seventh byte are always '0x4', and the middle bits of byte 9 are
- # always set
- self.assertEqual('4', uuid[12])
- self.assertTrue(uuid[16] in '89ab')
- self.assertTrue(uuid not in uuids)
- uuids.add(uuid)