Adodb for Python เป็น API สำหรับให้ไพธอนติดต่อกับฐานข้อมูลใด ๆ
ข้อดีคือสามารถเปลี่ยนฐานข้อมูลโดยไม่ต้องเปลี่ยนคำสั่งการติดต่อ
ข้อเสียคือมีชั้นการรันเพิ่ม ทำให้เข้าถึงข้อมูลช้าลง
มีเอกสารภาษาไทยอยู่ที่ Exzilla.net - ADOdb MySQL Tutorial (Thai Translation)
ในที่นี้จะเขียนแค่ postgresql
# aptitude install python python-adodb postgresql-8.1 psycopg
import adodb;
conn = adodb.NewADOConnection('postgres')
conn.Connect('server','user','pwd','db')
cursor = conn.Execute('select * from table')
while not cursor.EOF:
print cursor.fields
cursor.MoveNext()
#
cursor.Close()
conn.Close()ตัวอย่างต่อไปแปลงมาจาก ADOdb Library for PHP ผิด ตก ยก เว้น
import adodb
conn = adodb.ADONewConnection('access')
cursor = conn.Connect('northwind')
rs = conn.Execute('SELECT * FROM products')
if not rs:
print conn.ErrorMsg()
else:
while not rs.EOF:
print "%s %s <br />" % (rs.fields[0], rs.fields[1])
rs.MoveNext()
#
#
rs.Close() # optional
conn.Close() # optionalimport adodb
conn = adodb.ADONewConnection('access')
cur = conn.Connect('northwind')
rs = conn.Execute('SELECT CustomerID, OrderDate FROM Orders')
if not rs:
print conn.ErrorMsg
else:
while not rs.EOF:
field = rs.FetchField(1)
metatype = rs.MetaType(field.type)
if metatype=='D' or metatype=='T':
print "%s %s <br />" % (rs.fields[0], rs.fields[1])
else:
print "%s <br />" % (rs.fields[0])
#
rs.MoveNext()
#
#
rs.Close() # optional
conn.Close() # optionalimport adodb
import datetime
conn = adodb.ADONewConnection('access')
cur = conn.Connect('northwind')
shipto = conn.qstr("John's Old Shoppe")
sql = "INSERT INTO orders (CustomerID,EmployeeID,OrderDate,ShipName)" \
+ " VALUES ('ANNATR', 2, %s, %s" \
% (conn.DBDate(datetime.datetime.now()), shipto)
if not conn.Execute(sql):
print "Error insertion: %s <br />" % (conn.ErrorMsg())
#import adodb
import datetime
conn = adodb.ADONewConnection('access')
cur = conn.Connect('northwind')
shipto = conn.qstr("John's Old Shoppe")
sql = "INSERT INTO orders (CustomerID, EmployeeID, OrderDate, ShipName)"\
+ " VALUES ('ANATR', 2, %s, %s)" \
% (conn.FormatDate(datetime.datetime.now()), shipto)
conn.debug = True
if not conn.Execute(sql):
print "Error inserting"
#import sys
try:
curs = conn.Execute('select * from badtable') # table does not exist
except:
print sys.exc_info()[1] # retrieve the error message returned by databaseวิธียืมจาก php - ต้องปิด useExceptions ก่อน
conn.useExceptions = False
curs = conn.Execute('select * from badtable') # table does not exist
if curs == None:
print conn.ErrorMsg()ต้องการพอร์ตข้อมูลที่เป็น dbf ของ Visual Foxpro มาลงใน postgres โดยใช้โมดูล adodb
ค้นกูเกิลเจอโมดูลในการอ่าน Visual Foxpro dbf ที่
เว็บของคุณ Yusdi Santoso หมวด Software Repository
ใช้งานได้ดีทีเดียว เพราะอ่าน Memo Field ของ Visual Foxpro ออกหมด
แต่ดันพลาดตอนไม่ได้ตรวจสอบข้อมูลของ Memo Field เพราะเขาเอาไบต์ที่เป็น Null มาด้วย ('\x00')
INSERT เข้าไปเท่าไหร่ก็แสดงข้อผิดพลาดอยู่ตลอด เสียเวลาทั้งวันในการดีบัก เพราะดันไปเพ่งเล็งที่ adodb กับ pyscopg
ข้อผิดพลาดที่แสดงคือ
psycopg.ProgrammingError: ERROR: unterminated quoted string at or near "'" at character XXX
ลองไปลองมาถึงได้ทราบว่าเป็นที่ Null character
แก้ด้วยการเติมฟังก์ชั่นการกรองข้อมูลคือ
def dncode(s):
return s.strip().strip('\x00').decode('tis620').encode('utf8')
ให้ถอดรหัสอักขระด้วย tis620 ด้วย เพราะข้อมูลเก่าเป็นรหัสภาษาไทยแบบวินโดวส์
ผ่านเรียบร้อย โดยเสีย(ค่าฉลาดน้อย)ไปหนึ่งวัน
จากการทดลองนำเข้าข้อมูล dbf ในครั้งก่อน พบข้อผิดพลาดในการแปลงอีกอันนึง คือ สระอำ
หลังจากแปลงมาแล้ว พบว่าส่วนใหญ่จะแปลงได้ถูกต้อง ยกเว้นบางคำที่เขาแปลงออกมาเป็น 2 อักขระ
คือประกอบด้วย นิคหิต ( _ํ ) กับสระอา ( า ) แทนที่จะเป็นสระอำอักขระเดียว
ซึ่งยังไม่ทราบว่าเกิดจากสาเหตุอะไร (อาจเป็นข้อมูลต้นทางไม่ดีก็เป็นได้)
เราจึงควรตรวจสอบข้อมูล ในการนำเข้า ด้วยการกรองสระอำอีกชั้นนึงดังนี้
...
def dncode(s):
return s.strip().strip('\x00').decode('tis620').encode('utf8').replace('\xe0\xb9\x8d\xe0\xb8\xb2','\xe0\xb8\xb3')
...
update
ถึงเวลาใช้งานจริงก็ยังมีข้อมูลที่ไม่อยู่ในช่วงของ Ascii Codepage-874 หลุดออกมา ทำให้การถอดรหัส (decode('tis620')) ยังรายงานข้อผิดพลาด
เราต้องถอดเอาอักขระขยะออกให้หมด
ฟังก์ชั่น dncode สุดท้ายจึงเป็นดังนี้
...
def dncode(s):
return s.strip().strip('\x00').strip('\xa0').strip('\xdb').strip('\xdc').strip('\xdd').strip('\xde').strip('\xfc').strip('\xfd').strip('\xfe').strip('\xff').decode('tis620').encode('utf8').replace('\xe0\xb9\x8d\xe0\xb8\xb2','\xe0\xb8\xb3')
...ตอนทำงาน ไพธอนจะทำงานจากซ้ายไปขวา จึงต้อง strip อักขระขยะออกก่อน แล้วจึงตามด้วยการ decode/encode เป็นลำดับสุดท้าย
ลองทดสอบวัดประสิทธิภาพแบบคร่าว ๆ เพื่อหาวิธีการเขียนโค๊ด
เริ่มต้นด้วยการสร้างคลาสเพื่อจับเวลาก่อน
class TimeIt:
import time
def __init__(self):
self.p_start=time.time()
def use(self):
t_now=time.time()
t_use=t_now-self.p_start
self.p_start=t_now
return t_use
ตามด้วยการสั่งจากเชลล์ของไพธอน
import adodb
conn = adodb.NewADOConnection("postgres")
cur = conn.Connect(host, user, password, db)
sql = """CREATE TABLE test (wordid SERIAL, word VARCHAR(255), PRIMARY KEY (wordid))"""
cur = conn.Execute(sql)แบบที่ ๑
def f1():
ttt = TimeIt()
for i in range(1000):
sql = """INSERT INTO test (word) VALUES ('abc%s')""" % (i,)
cur = conn.Execute(sql)
sql = """SELECT wordid FROM test WHERE word=%s""" % (conn.qstr(i),)
cur = conn.Execute(sql)
print ttt.use()
sql = """DELETE FROM test"""
cur = conn.Execute(sql)
ได้ผลเป็น
>>> f1() 9.27473306656 >>> f1() 9.16922688484 >>> f1() 9.21483206749 >>> f1() 9.20028710365 >>> f1() 10.2529239655 >>> f1() 9.16449689865
แบบที่ ๒
def f2():
ttt = TimeIt()
for i in range(1000):
sql = """INSERT INTO test (word) VALUES ('abc%s'); \
SELECT wordid FROM test WHERE word=%s""" % (i, conn.qstr(i),)
cur = conn.Execute(sql)
print ttt.use()
sql = """DELETE FROM test"""
cur = conn.Execute(sql)
ได้ผลเป็น
>>> f2() 9.11072301865 >>> f2() 9.20462703705 >>> f2() 9.24071407318 >>> f2() 9.25392103195 >>> f2() 9.02831697464 >>> f2() 9.07160282135 >>>
def f3():
def get_id(word):
sql = """SELECT wordid FROM test WHERE word=%s""" % (conn.qstr(word),)
cur = conn.Execute(sql)
return cur.fields[0]
def add_word(word):
sql = """INSERT INTO test (word) VALUES (%s)""" % (conn.qstr(word),)
cur = conn.Execute(sql)
return get_id(word)
ttt = TimeIt()
for i in range(1000):
add_word("abc%s" % (i,))
print ttt.use()
sql = """DELETE FROM test"""
cur = conn.Execute(sql)
ได้ผลเป็น
>>> f3() 9.02051997185 >>> f3() 9.06975603104 >>> f3() 9.30845808983 >>> f3() 9.06503009796 >>> f3() 9.89376401901 >>> f3() 9.28391385078 >>>
def f4():
def add_word(word):
sql = """INSERT INTO test (word) VALUES (%s)""" % (conn.qstr(word),)
cur = conn.Execute(sql)
return cur
ttt = TimeIt()
for i in range(1000):
add_word("abc%s" % (i,))
print ttt.use()
sql = """DELETE FROM test"""
cur = conn.Execute(sql)
ได้ผลเป็น
>>> f4() 8.93194293976 >>> f4() 8.94240808487 >>> f4() 9.14316105843 >>> f4() 8.93977403641 >>> f4() 9.05243611336 >>> f4() 9.06761908531 >>>
สรุป
ความเร็วแทบไม่ต่างกันเลย ฉะนั้น เขียนให้อ่านง่าย ปรับปรุงง่าย ดีที่สุด
ค้างทดสอบ (รอ postgres-8.2)
def fx():
ttt = TimeIt()
for i in range(1000):
sql = """INSERT INTO test (word) VALUES ('abc%s') RETURNING \
(SELECT wordid FROM test WHERE word=%s)""" % (i, conn.qstr(i),)
cur = conn.Execute(sql)
print ttt.use()
sql = """DELETE FROM test"""
cur = conn.Execute(sql)
RETURNING มีใช้ใน Oracle กับ Postgres-8.2.3
Executes the equivalent following sql statement:
UPDATE table SET field = blob WHERE whereclause
The blobtype field should be set to either 'BLOB' or 'CLOB'.
Any special encoding required for the blob is applied
transparently.
Loads the binary file filepath into blob. Then
calls UpdateBlob( ).
Note that some databases return more information in each row.
Returns field information from a SELECT statement. The fieldoffset
is zero-based, so to retrieve info on the 1st field use FetchField(0).
A tuple is returned, consisting of:
(name, type_code,display_size, internal_size, precision,
scale,null_ok).
ก่อนอื่น จะสร้างฐานข้อมูลเพื่อใช้เป็นตัวอย่างก่อน
โดยจะสร้างตารางเป็นสมุดโทรศัพท์ ใส่ข้อมูลเบื้องต้นไป 5 แถว
(อย่าเชื่อ syntax มากนะครับ เริ่มหัดใหม่เหมือนกัน)
import adodb driver = 'postgres' host = 'host' user = 'user' password = 'password' db = 'db' conn = adodb.NewADOConnection(driver) cur = conn.Connect(host,user,password,db) sql = """\ CREATE TABLE phone ( pid INT, name VARCHAR(50), phone VARCHAR(50), category VARCHAR(50), update DATE, rem TEXT, img BYTEA)""" cur = conn.Execute(sql) from datetime import datetime today = conn.DBDate(datetime.date(datetime.today())) sql = "INSERT INTO phone \ (pid, name, phone, category, update, rem, img) VALUES " cur=conn.Execute(sql+"(%s, '%s', '%s', '%s', %s, '%s', '%s')" \ % (1, 'name1', '0-0000-0001', 'cat1', today, 'rem1', '')) cur=conn.Execute(sql+"(%s, '%s', '%s', '%s', %s, '%s', '%s')" \ % (2, 'name2', '0-0000-0002', 'cat2', today, 'rem2', '')) cur=conn.Execute(sql+"(%s, '%s', '%s', '%s', %s, '%s', '%s')" \ % (3, 'name3', '0-0000-0003', 'cat3', today, 'rem3', '')) cur=conn.Execute(sql+"(%s, '%s', '%s', '%s', %s, '%s', '%s')" \ % (4, 'name4', '0-0000-0004', 'cat4', today, 'rem4', '')) cur=conn.Execute(sql+"(%s, '%s', '%s', '%s', %s, '%s', '%s')" \ % (5, 'name5', '0-0000-0005', 'cat5', today, 'rem5', ''))
เพื่อไม่ให้เยิ่นเย้อ จะใช้ตัวแปร conn ซึ่งจะหมายถึง
... conn = adodb.NewADOConnection(driver) ...
เป็นปกติของหน้านี้
>>> cur=conn.SelectLimit('SELECT * FROM phone', 2)
(SAME AS)
>>> cur=conn.Execute('SELECT * FROM phone LIMIT 2')
>>> while not cur.EOF:
... print cur.fields
... cur.MoveNext()
...
(1, 'name1', '0-0000-0001', 'cat1', <DateTime object for '2007-01-25 00:00:00.00' at b764b4f0>, 'rem1', '')
False
(2, 'name2', '0-0000-0002', 'cat2', <DateTime object for '2007-01-25 00:00:00.00' at b764b448>, 'rem2', '')
True>>> try:
... cur=conn.Execute('SELECT * FROM notable')
... except:
... print conn.ErrorMsg()
...
ERROR: relation "notable" does not exist
SELECT * FROM notable>>> conn=adodb.NewADOConnection('postgres')
>>> conn.IsConnected()
False
>>> conn.Connect('host','user','password','database')
True
>>> conn.IsConnected()
True>>> print conn.qstr(2) '2'
qstr( )>>> print conn.GetAll('SELECT * FROM phone')
[(1, 'name1', '0-0000-0001', 'cat1', <DateTime object for '2007-01-25 00:00:00.00' at b764b4f0>, 'rem1', ''), (2, 'name2', '0-0000-0002', 'cat2', <DateTime object for '2007-01-25 00:00:00.00' at b764b448>, 'rem2', ''), (3, 'name3', '0-0000-0003', 'cat3', <DateTime object for '2007-01-25 00:00:00.00' at b764b410>, 'rem3', ''), (4, 'name4', '0-0000-0004', 'cat4', <DateTime object for '2007-01-25 00:00:00.00' at b764b560>, 'rem4', ''), (5, 'name5', '0-0000-0005', 'cat5', <DateTime object for '2007-01-25 00:00:00.00' at b764b598>, 'rem5', '')]>>> print conn.GetArray('SELECT * FROM phone')
[(1, 'name1', '0-0000-0001', 'cat1', <DateTime object for '2007-01-25 00:00:00.00' at b764b4f0>, 'rem1', ''), (2, 'name2', '0-0000-0002', 'cat2', <DateTime object for '2007-01-25 00:00:00.00' at b764b448>, 'rem2', ''), (3, 'name3', '0-0000-0003', 'cat3', <DateTime object for '2007-01-25 00:00:00.00' at b764b410>, 'rem3', ''), (4, 'name4', '0-0000-0004', 'cat4', <DateTime object for '2007-01-25 00:00:00.00' at b764b560>, 'rem4', ''), (5, 'name5', '0-0000-0005', 'cat5', <DateTime object for '2007-01-25 00:00:00.00' at b764b598>, 'rem5', '')]>>> print conn.GetRow('SELECT * FROM phone')
(1, 'name1', '0-0000-0001', 'cat1', <DateTime object for '2007-01-25 00:00:00.00' at b764b4f0>, 'rem1', '')>>> print conn.GetOne('SELECT * FROM phone')
1>>> print conn.GetAssoc('SELECT * FROM phone')
{1: ('name1', '0-0000-0001', 'cat1', <DateTime object for '2007-01-25 00:00:00.00' at b764b2f8>, 'rem1', ''), 2: ('name2', '0-0000-0002', 'cat2', <DateTime object for '2007-01-25 00:00:00.00' at b764b528>, 'rem2', ''), 3: ('name3', '0-0000-0003', 'cat3', <DateTime object for '2007-01-25 00:00:00.00' at b764b330>, 'rem3', ''), 4: ('name4', '0-0000-0004', 'cat4', <DateTime object for '2007-01-25 00:00:00.00' at b764b368>, 'rem4', ''), 5: ('name5', '0-0000-0005', 'cat5', <DateTime object for '2007-01-25 00:00:00.00' at b764b4b8>, 'rem5', '')}
>>> dict = conn.GetAssoc('SELECT * FROM phone')
>>> for i in dict:
... print i, dict[i]
...
1 ('name1', '0-0000-0001', 'cat1', <DateTime object for '2007-01-25 00:00:00.00' at b764b4b8>, 'rem1', '')
2 ('name2', '0-0000-0002', 'cat2', <DateTime object for '2007-01-25 00:00:00.00' at b764b368>, 'rem2', '')
3 ('name3', '0-0000-0003', 'cat3', <DateTime object for '2007-01-25 00:00:00.00' at b764b330>, 'rem3', '')
4 ('name4', '0-0000-0004', 'cat4', <DateTime object for '2007-01-25 00:00:00.00' at b764b528>, 'rem4', '')
5 ('name5', '0-0000-0005', 'cat5', <DateTime object for '2007-01-25 00:00:00.00' at b764b2f8>, 'rem5', '')GetAssoc().>>> print conn.GetCol('SELECT * FROM phone')
[1, 2, 3, 4, 5]>>> print conn.MetaType('B')
B
>>> print conn.MetaType('BINARY')
B
>>> print conn.MetaType('date')
D
>>> print conn.MetaType('varchar')
C
>>> print conn.MetaType('IMAGE')
BNote that some databases return more information in each row.
>>> print conn.MetaColumns('phone')
[('pid', 'int4', 4, -1, 0, 0, 1), ('name', 'varchar', -1, 54, 0, 0, 2), ('phone', 'varchar', -1, 54, 0, 0, 3), ('category', 'varchar', -1, 54, 0, 0, 4), ('update', 'date', 4, -1, 0, 0, 5), ('rem', 'text', -1, -1, 0, 0, 6), ('img', 'bytea', -1, -1, 0, 0, 7)]
>>> for line in conn.MetaColumns('phone'):
... print line
...
('pid', 'int4', 4, -1, 0, 0, 1)
('name', 'varchar', -1, 54, 0, 0, 2)
('phone', 'varchar', -1, 54, 0, 0, 3)
('category', 'varchar', -1, 54, 0, 0, 4)
('update', 'date', 4, -1, 0, 0, 5)
('rem', 'text', -1, -1, 0, 0, 6)
('img', 'bytea', -1, -1, 0, 0, 7)
MetaColumns format:
field name, field type, field length, max. field length, is_null, is_serial, field order
>>> from datetime import datetime >>> today = conn.DBDate(datetime.date(datetime.today())) >>> print today '2007-01-25'
>>> now=conn.DBTimeStamp(datetime.datetime.now()) >>> print now '2007-01-25 00:00:00'
>>> print conn.Date('2007-01-25')
2007-01-25 00:00:00>>> print conn.TimeStamp('2007-01-25 00:00:00')
2007-01-25 00:00:00>>> dir(conn.Module()) ['BINARY', 'BOOLEAN', 'Binary', 'DATE', 'DATETIME', \ 'DataError', 'DatabaseError', 'Date', 'DateFromMx', \ 'DateFromTicks', 'Error', 'FLOAT', 'INTEGER', 'INTERVAL', \ 'IntegrityError', 'InterfaceError', 'InternalError', \ 'LONGINTEGER', 'NUMBER', 'NotSupportedError', \ 'OperationalError', 'ProgrammingError', 'QuotedString', \ 'ROWID', 'STRING', 'TIME', 'Time', 'TimeFromMx', \ 'TimeFromTicks', 'Timestamp', 'TimestampFromMx', \ 'TimestampFromTicks', 'Warning', '__doc__', '__file__', \ '__name__', '__version__', 'apilevel', 'connect', 'new_type', \ 'paramstyle', 'register_type', 'threadsafety', 'types']
>>> dir(conn.Conn()) ['autocommit', 'close', 'commit', 'cursor', 'cursors', \ 'maxconn', 'minconn', 'rollback', 'serialize', \ 'set_isolation_level']
>>> print conn.DriverInfo() Driver = postgres API Level = 2.0 Param Style = pyformat Thread Safety = 2 (0=none, 1=module, 2=connections, 3=cursors) -------------- None
เพื่อไม่ให้เยิ่นเย้อ จะใช้ตัวแปร cur ซึ่งจะหมายถึง
...
conn = adodb.NewADOConnection(driver)
cur = conn.Execute('SELECT * FROM phone')
...เป็นปกติของส่วนนี้
>>> print cur.RecordCount() 5
RecordCount( ).>>> cur.MoveNext() False >>> cur.EOF False
>>> a=cur.FetchRow() >>> a (2, 'name2', '0-0000-0002', 'cat2', , 'rem2', '')
>>> print cur.GetRowAssoc()
{'CATEGORY': 'cat3', 'NAME': 'name3', 'IMG': '', 'PID': 3, \
'UPDATE': , 'PHONE': '0-0000-0003', 'REM': 'rem3'}>>> print cur.FetchField(0)
('pid', 23, 1, 4, None, None, None)
>>> cur.FetchField(1)
('name', 1043, 5, 50, None, None, None)>>> cur.Close()
>>> dir(cur.Cursor()) ['arraysize', 'autocommit', 'callproc', 'close', 'commit', \ 'copy_from', 'copy_to', 'description', 'dictfetchall', \ 'dictfetchmany', 'dictfetchone', 'execute', 'executemany', \ 'fetchall', 'fetchmany', 'fetchone', 'fileno', 'lastoid', \ 'lastrowid', 'nextset', 'notifies', 'rollback', 'rowcount', \ 'scroll', 'setinputsizes', 'setoutputsize', 'statusmessage']
>>> print cur.fields (3, 'name3', '0-0000-0003', 'cat3', , 'rem3', '')
>>> print cur.EOF False
เกร็ด
>>> conn=adodb.NewADOConnection('postgres')
>>> conn.Connect('host','user','password','')
Traceback (most recent call last):
File "<stdin>", line 1, in ?
File "/usr/lib/python2.4/site-packages/adodb/adodb.py", line 199, in Connect
self._connect(host,user,password,database)
File "/usr/lib/python2.4/site-packages/adodb/adodb_postgres.py", line 46, in _connect
self._conn = psycopg.connect(dsn)
psycopg.OperationalError: FATAL: database "user" does not existแต่เราสามารถทำโดยอ้อมได้ มีข้อแม้ว่าต้องมี database อยู่ก่อน ก็คือเราติดต่อเข้าไปหา database ที่มีอยู่ ในที่นี้เราใช้ postgres ตัว database ที่มีอยู่แน่นอนก็คือ database ชื่อ postgres แล้วจึงค่อยสร้างใหม่จากคำสั่ง Execute SQL
>>> conn=adodb.NewADOConnection('postgres')
>>> conn.Connect('host','user','password','postgres')
True
>>> cur=conn.Execute('CREATE DATABASE %s OWNER %s' % ('new_db','user'))
>>> conn.Connect('host','user','password','new_db')
True########################################################################
# Vers 2.01 5 May 2006, (c)2004-2006 John Lim (jlim#natsoft.com.my) All Rights Reserved
# Released under a BSD-style license. See LICENSE.txt.
# Download: http://adodb.sourceforge.net/#pydownload
########################################################################
__author__ = "John Lim (jlim#natsoft.com)"
__credits__ = "(c) 2004-2006 John Lim"
import exceptions,sys,re
from datetime import datetime
try:
True, False
except NameError:
# Maintain compatibility with Python 2.2
True, False = 1, 0
MapTypes = {
'VARCHAR' : 'C',
'VARCHAR2' : 'C',
'CHAR' : 'C',
'C' : 'C',
'STRING' : 'C',
'NCHAR' : 'C',
'NVARCHAR' : 'C',
'VARYING' : 'C',
'BPCHAR' : 'C',
'CHARACTER' : 'C',
'INTERVAL' : 'C', # Postgres
##
'LONGCHAR' : 'X',
'TEXT' : 'X',
'NTEXT' : 'X',
'M' : 'X',
'X' : 'X',
'CLOB' : 'X',
'NCLOB' : 'X',
'LVARCHAR' : 'X',
##
'BLOB' : 'B',
'IMAGE' : 'B',
'BINARY' : 'B',
'VARBINARY' : 'B',
'LONGBINARY' : 'B',
'B' : 'B',
##
'YEAR' : 'D', # mysql
'DATE' : 'D',
'D' : 'D',
##
'TIME' : 'T',
'TIMESTAMP' : 'T',
'DATETIME' : 'T',
'TIMESTAMPTZ' : 'T',
'T' : 'T',
##
'BOOL' : 'L',
'BOOLEAN' : 'L',
'BIT' : 'L',
'L' : 'L',
##
'COUNTER' : 'R',
'R' : 'R',
'SERIAL' : 'R', # ifx
'INT IDENTITY' : 'R',
##
'INT' : 'I',
'INTEGER' : 'I',
'INTEGER UNSIGNED' : 'I',
'SHORT' : 'I',
'TINYINT' : 'I',
'SMALLINT' : 'I',
'I' : 'I',
##
'LONG' : 'N', # interbase is numeric, oci8 is blob
'BIGINT' : 'N', # this is bigger than PHP 32-bit integers
'DECIMAL' : 'N',
'DEC' : 'N',
'REAL' : 'N',
'DOUBLE' : 'N',
'DOUBLE PRECISION' : 'N',
'SMALLFLOAT' : 'N',
'FLOAT' : 'N',
'NUMBER' : 'N',
'NUM' : 'N',
'NUMERIC' : 'N',
'MONEY' : 'N',
## informix 9.2
'SQLINT' : 'I',
'SQLSERIAL' : 'I',
'SQLSMINT' : 'I',
'SQLSMFLOAT' : 'N',
'SQLFLOAT' : 'N',
'SQLMONEY' : 'N',
'SQLDECIMAL' : 'N',
'SQLDATE' : 'D',
'SQLVCHAR' : 'C',
'SQLCHAR' : 'C',
'SQLDTIME' : 'T',
'SQLINTERVAL' : 'N',
'SQLBYTES' : 'B',
'SQLTEXT' : 'X'}
class adodb_iter:
def __iter__(self):
def next(self):
def NewADOConnection(modulename):
def ADONewConnection(modulename):
class ADOConnection:
databaseType = None
dataProvider = 'native'
host = None
user = None
password = None
database = None
replaceQuote = "\\'"
useExceptions = True
debug = None
getLOBs = True
hasRowCount = True
metaColSQL = 'Invalid'
fmtDate = '%Y-%m-%d'
fmtTimeStamp = '%Y-%m-%d %H:%M:%S'
_errormsg = ''
_errno = 0
_conn = None
_autocommit = True
_connected = True
def __init__(self):
pass
def Connect(self,host=None,user=None,password=None,database=None):
def IsConnected(self):
def DriverInfo(self):
def ErrorMsg(self):
def ErrorNo(self):
def qstr(self,s):
def quote(self,s):
def addq(self,s):
def Conn(self):
def _query(self,sql,params=None,_cursor=None):
def SelectLimit(self,sql,limit,offset=-1,params=None):
def Execute(self,sql,params=None):
def UpdateBlob(self,table,field,blob,where,blobtype='BLOB'):
def UpdateBlobFile(self,table,field,filepath,where,blobtype='BLOB'):
def UpdateClob(self,table,field,blob,where):
def GetRows(self,sql,params=None):
def GetArray(self,sql,params=None):
def GetAll(self,sql,params=None):
def GetRow(self,sql,params=None):
def GetRow(self,sql,params=None):
def GetOne(self,sql,params=None):
def GetCol(self, sql, params=None):
def GetAssoc(self, sql, params=None):
def GetDict(self, sql, params=None):
def BeginTrans(self):
def CommitTrans(self):
def RollbackTrans(self):
def Close(self):
def DBDate(self,d):
def DBTimeStamp(self,d):
def Date(self,s):
def TimeStamp(self,s):
def MetaType(self, dbtype):
def MetaColumns(self, table):
class ADOCursor:
_cursor = None
fields = None
EOF = False
_rowcount = 0
_isselect = False
_insertid = 0
_conn = None
def __init__(self,rs,conn,norowcount=False):
def __iter__(self):
def RecordCount(self):
def MoveNext(self):
def FetchRow(self):
# returns a tuple of the form (name, type_code,display_size, internal_size, precision, scale,null_ok)
# note: databases could return name in upper or lower-case
def FetchField(self,row):
def Affected_Rows(self):
def Insert_ID(self):
def Cursor(self):
def GetRowAssoc(self,upper=1):
def Close(self):
#===========================================================
# UNIT TESTING
#===========================================================
def _Test_Eq(testid, correct, testval, errmsg=''):
if correct == testval:
print "Passed Test: "+testid
else:
print ""
print "********* Failed Test: "+testid
print "********************** "+str(errmsg)
print "********************** expected="+str(correct)
print "********************** actual="+str(testval)
def Test_Blob(db):
import os
src = 'c:/lensserver.gif'
dest = 'c:/testpy1.gif'
try:
os.unlink(dest)
except:
pass
saved = db.debug
saveb = db.getLOBs
db.debug = True
db.getLOBs = True
db.UpdateBlobFile('photos','photo',src,'id=1')
data = db.GetOne('select photo from photos where id=1')
f = file(dest,'wb')
f.write(data)
f.close()
rs = db.Execute('select * from photos')
while not rs.EOF:
print 'Fields=',rs.fields
rs.MoveNext()
print "======================="
rows = db.GetAll('select * from photos where id<=1')
print rows
db.getLOBs = saveb
db.debug = saved
def Test(db,debug=False):
db.DriverInfo()
if False:
d = db.Date('2004-03-21')
print '2004-03-21=',d
d = db.TimeStamp('2004-03-22 12:50:51')
print '2004-03-22 12:50:51=',d
print "DBTimeStamp=", db.DBTimeStamp(d)
db.useExceptions = True # use adodb error handling
try:
sql = 'select * from xadoxyz where 0 < id and id < 3'
rs = db.Execute(sql)
_Test_Eq('Bad SQL',None, rs, sql)
except:
print "And you should see an error message indicating bad table was defined: "
print "err=",db.ErrorMsg()
print "-----"
rs = db.Execute('select * from ADOXYZ where 0 < id and id < 3 order by id')
while not rs.EOF:
print rs.fields
rs.MoveNext()
print "You should see 2 rows of data here:"
rs = db.Execute('select * from adoxyz where 0 < id and id < 3 order by id')
print "rows=",rs.RecordCount()
while (not rs.EOF):
print rs.GetRowAssoc()
rs.MoveNext()
print "-----"
rs = db.Execute('select id,firstname from adoxyz where 0 < id and id < 3 order by id')
_Test_Eq("Test FetchField",'FIRSTNAME',rs.FetchField(1)[0].upper())
if (debug): print rs.FetchField(1)
cnt = 0
while 1:
arr=rs.FetchRow()
if arr == None: break
cnt += 1
_Test_Eq('Execute 2.0',cnt,arr[0])
_Test_Eq('Execute 2.1',2,cnt)
if rs.RecordCount() == -1: print "*** RecordCount not supported: -1"
else: _Test_Eq('Execute 2.1 RecordCount',2,rs.RecordCount())
rs = db.Execute("delete from adoxyz where id=997")
cnt = rs.Affected_Rows()
_Test_Eq('Affected_Rows',1,cnt)
ok = db.Execute("insert into adoxyz (id, firstname,lastname) values (997,'python','snake')")
if not ok: _Test_Eq('DELETE/INSERT','inserted row','failed insert')
row = db.GetRow("select id,firstname from adoxyz where id=997");
_Test_Eq('GetRow',str(997)+' '+'python',str(int(row[0]))+' '+row[1].rstrip(),row)
row = db.GetOne("select id,firstname from adoxyz where id=997");
_Test_Eq('GetOne',997,row)
rs = db.SelectLimit("select id,firstname from adoxyz",3)
cnt = 0
try:
for row in rs:
cnt += 1
#print rs.fields
_Test_Eq('SelectLimit',3,cnt)
except:
print "Failed Iteration"
print sys.exc_info()[1]
d = db.GetOne('select created from adoxyz where id=1')
d2 = db.TimeStamp(d)
_Test_Eq('DBDate',str(d)[:19],str(d2))
if (db.qstr("\\show'boat") != "'\\\\show\\'boat'" and db.qstr("\\show'boat") != "'\\show''boat'"):
_Test_Eq('qstr',"qstr(\\show'boat)", db.qstr("\\show'boat"))
else:
_Test_Eq('qstr','1','1')
try:
db.debug=True
print "Testing GetAssoc"
arr = db.GetAssoc('select firstname,lastname from adoxyz')
print arr
print "Testing GetCol"
arr = db.GetCol('select firstname from adoxyz')
print arr
except:
print sys.exc_info()[1]
try:
print "MetaColumns:"
rows = db.MetaColumns('adoxyz')
print rows
except:
print "Failed MetaColumns"
print sys.exc_info()[1]
try:
db.BeginTrans()
ok = db.Execute("insert into adoxyz (id, firstname,lastname) values (1997,'python','snake')")
db.RollbackTrans()
val = db.GetOne('select * from adoxyz where id=1997')
_Test_Eq('Rollback Test',None,val)
except:
print "Failed Rollback Test"
print sys.exc_info()[1]