1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
#!/usr/bin/env python
# This is a test case I got from a user that will crash PySQLite 0.5.0.
# It stress-tests PySQLite in multithreaded mode.
import os
import sys
import threading
import time
import random
import sqlite
dbname = "test.db"
#MECHANISM = "no timeout"
#MECHANISM = "use timeout"
#MECHANISM = "use slow busy handler"
MECHANISM = "use fast busy handler"
class Modifier(threading.Thread):
def __init__(self, dbname):
threading.Thread.__init__(self)
self.dbname = dbname
def run(self):
print "Modifier: start"
cx = sqlite.connect(self.dbname)
cu = cx.cursor()
print "Modifier: INSERTing"
cu.execute("INSERT INTO meta (name, value) VALUES (%s, %s)",
"foo", "blah blah blah")
for i in range(5):
print "Modifier: sleeping %d" % i
time.sleep(1)
print "Modifier: committing"
cx.commit()
print "Modifier: committed"
cu.close()
cx.close()
print "Modifier: end"
class Reader(threading.Thread):
def __init__(self, name, dbname):
threading.Thread.__init__(self, name=name)
self.dbname = dbname
def busyHandler(self, delay, table, numAttempts):
print "Reader %s: busyHandler(delay=%r, table=%r, numAttempts=%r)"\
% (self.getName(), delay, table, numAttempts)
time.sleep(delay)
return 1
def run(self):
print "Reader %s: start" % self.getName()
if MECHANISM == "no timeout":
cx = sqlite.connect(self.dbname)
elif MECHANISM == "use timeout":
cx = sqlite.connect(self.dbname, timeout=5000)
elif MECHANISM == "use slow busy handler":
cx = sqlite.connect(self.dbname)
cx.db.sqlite_busy_handler(self.busyHandler, 1.0)
elif MECHANISM == "use fast busy handler":
cx = sqlite.connect(self.dbname, Xtimeout=5000.0)
cx.db.sqlite_busy_handler(self.busyHandler, 0.1)
else:
raise ValueError("MECHANISM is not one of the expected values")
sleepFor = random.randint(0, 3)
print "Reader %s: sleeping for %d seconds" % (self.getName(), sleepFor)
time.sleep(sleepFor)
print "Reader %s: waking up" % self.getName()
cu = cx.cursor()
print "Reader %s: SELECTing" % self.getName()
cu.execute("SELECT name, value FROM meta WHERE name='%s'" % self.getName())
print "Reader %s: SELECTed %s" % (self.getName(), cu.fetchone())
cu.close()
cx.close()
print "Reader %s: end" % self.getName()
def test_sqlite_busy():
"""Test handling of SQL_BUSY "errors" as discussed here:
http://www.hwaci.com/sw/sqlite/faq.html#q7
http://www.sqlite.org/cvstrac/wiki?p=MultiThreading
Algorithm:
- start one thread that will open the database and start modifying it
then sleep for a while so other threads can get in there
- have other thread(s) do selects from the database and see if they
error out, if they block, if they timeout (play with timeout
.connect() argument)
"""
# Create a fresh starting database.
if os.path.exists(dbname):
os.remove(dbname)
journal = dbname+"-journal"
if os.path.exists(journal):
os.remove(journal)
cx = sqlite.connect(dbname)
cu = cx.cursor()
cu.execute("CREATE TABLE meta (name STRING, value STRING)")
cx.commit()
cu.close()
cx.close()
modifier = Modifier(dbname)
readerNames = ("foo",) #XXX "bar", "baz")
readers = [Reader(name, dbname) for name in readerNames]
modifier.start()
for reader in readers:
reader.start()
modifier.join()
for reader in readers:
reader.join()
if __name__ == "__main__":
test_sqlite_busy()
|