Adodb for Python เป็น API สำหรับให้ไพธอนติดต่อกับฐานข้อมูลใด ๆ
ข้อดีคือสามารถเปลี่ยนฐานข้อมูลโดยไม่ต้องเปลี่ยนคำสั่งการติดต่อ
ข้อเสียคือมีชั้นการรันเพิ่ม ทำให้เข้าถึงข้อมูลช้าลง
มีเอกสารภาษาไทยอยู่ที่ Exzilla.net - ADOdb MySQL Tutorial (Thai Translation)
ในที่นี้จะเขียนแค่ postgresql
# aptitude install python python-adodb postgresql-8.1 psycopg
import adodb; conn = adodb.NewADOConnection('postgres') conn.Connect('server','user','pwd','db') cursor = conn.Execute('select * from table') while not cursor.EOF: print cursor.fields cursor.MoveNext() # cursor.Close() conn.Close()
ตัวอย่างต่อไปแปลงมาจาก ADOdb Library for PHP ผิด ตก ยก เว้น
import adodb conn = adodb.ADONewConnection('access') cursor = conn.Connect('northwind') rs = conn.Execute('SELECT * FROM products') if not rs: print conn.ErrorMsg() else: while not rs.EOF: print "%s %s <br />" % (rs.fields[0], rs.fields[1]) rs.MoveNext() # # rs.Close() # optional conn.Close() # optional
import adodb conn = adodb.ADONewConnection('access') cur = conn.Connect('northwind') rs = conn.Execute('SELECT CustomerID, OrderDate FROM Orders') if not rs: print conn.ErrorMsg else: while not rs.EOF: field = rs.FetchField(1) metatype = rs.MetaType(field.type) if metatype=='D' or metatype=='T': print "%s %s <br />" % (rs.fields[0], rs.fields[1]) else: print "%s <br />" % (rs.fields[0]) # rs.MoveNext() # # rs.Close() # optional conn.Close() # optional
import adodb import datetime conn = adodb.ADONewConnection('access') cur = conn.Connect('northwind') shipto = conn.qstr("John's Old Shoppe") sql = "INSERT INTO orders (CustomerID,EmployeeID,OrderDate,ShipName)" \ + " VALUES ('ANNATR', 2, %s, %s" \ % (conn.DBDate(datetime.datetime.now()), shipto) if not conn.Execute(sql): print "Error insertion: %s <br />" % (conn.ErrorMsg()) #
import adodb import datetime conn = adodb.ADONewConnection('access') cur = conn.Connect('northwind') shipto = conn.qstr("John's Old Shoppe") sql = "INSERT INTO orders (CustomerID, EmployeeID, OrderDate, ShipName)"\ + " VALUES ('ANATR', 2, %s, %s)" \ % (conn.FormatDate(datetime.datetime.now()), shipto) conn.debug = True if not conn.Execute(sql): print "Error inserting" #
import sys try: curs = conn.Execute('select * from badtable') # table does not exist except: print sys.exc_info()[1] # retrieve the error message returned by database
วิธียืมจาก php - ต้องปิด useExceptions ก่อน
conn.useExceptions = False curs = conn.Execute('select * from badtable') # table does not exist if curs == None: print conn.ErrorMsg()
ต้องการพอร์ตข้อมูลที่เป็น dbf ของ Visual Foxpro มาลงใน postgres โดยใช้โมดูล adodb
ค้นกูเกิลเจอโมดูลในการอ่าน Visual Foxpro dbf ที่
เว็บของคุณ Yusdi Santoso หมวด Software Repository
ใช้งานได้ดีทีเดียว เพราะอ่าน Memo Field ของ Visual Foxpro ออกหมด
แต่ดันพลาดตอนไม่ได้ตรวจสอบข้อมูลของ Memo Field เพราะเขาเอาไบต์ที่เป็น Null มาด้วย ('\x00')
INSERT เข้าไปเท่าไหร่ก็แสดงข้อผิดพลาดอยู่ตลอด เสียเวลาทั้งวันในการดีบัก เพราะดันไปเพ่งเล็งที่ adodb กับ pyscopg
ข้อผิดพลาดที่แสดงคือ
psycopg.ProgrammingError: ERROR: unterminated quoted string at or near "'" at character XXX
ลองไปลองมาถึงได้ทราบว่าเป็นที่ Null character
แก้ด้วยการเติมฟังก์ชั่นการกรองข้อมูลคือ
def dncode(s): return s.strip().strip('\x00').decode('tis620').encode('utf8')
ให้ถอดรหัสอักขระด้วย tis620 ด้วย เพราะข้อมูลเก่าเป็นรหัสภาษาไทยแบบวินโดวส์
ผ่านเรียบร้อย โดยเสีย(ค่าฉลาดน้อย)ไปหนึ่งวัน
จากการทดลองนำเข้าข้อมูล dbf ในครั้งก่อน พบข้อผิดพลาดในการแปลงอีกอันนึง คือ สระอำ
หลังจากแปลงมาแล้ว พบว่าส่วนใหญ่จะแปลงได้ถูกต้อง ยกเว้นบางคำที่เขาแปลงออกมาเป็น 2 อักขระ
คือประกอบด้วย นิคหิต ( _ํ ) กับสระอา ( า ) แทนที่จะเป็นสระอำอักขระเดียว
ซึ่งยังไม่ทราบว่าเกิดจากสาเหตุอะไร (อาจเป็นข้อมูลต้นทางไม่ดีก็เป็นได้)
เราจึงควรตรวจสอบข้อมูล ในการนำเข้า ด้วยการกรองสระอำอีกชั้นนึงดังนี้
... def dncode(s): return s.strip().strip('\x00').decode('tis620').encode('utf8').replace('\xe0\xb9\x8d\xe0\xb8\xb2','\xe0\xb8\xb3') ...
update
ถึงเวลาใช้งานจริงก็ยังมีข้อมูลที่ไม่อยู่ในช่วงของ Ascii Codepage-874 หลุดออกมา ทำให้การถอดรหัส (decode('tis620')
) ยังรายงานข้อผิดพลาด
เราต้องถอดเอาอักขระขยะออกให้หมด
ฟังก์ชั่น dncode สุดท้ายจึงเป็นดังนี้
... def dncode(s): return s.strip().strip('\x00').strip('\xa0').strip('\xdb').strip('\xdc').strip('\xdd').strip('\xde').strip('\xfc').strip('\xfd').strip('\xfe').strip('\xff').decode('tis620').encode('utf8').replace('\xe0\xb9\x8d\xe0\xb8\xb2','\xe0\xb8\xb3') ...
ตอนทำงาน ไพธอนจะทำงานจากซ้ายไปขวา จึงต้อง strip อักขระขยะออกก่อน แล้วจึงตามด้วยการ decode/encode เป็นลำดับสุดท้าย
ลองทดสอบวัดประสิทธิภาพแบบคร่าว ๆ เพื่อหาวิธีการเขียนโค๊ด
เริ่มต้นด้วยการสร้างคลาสเพื่อจับเวลาก่อน
class TimeIt: import time def __init__(self): self.p_start=time.time() def use(self): t_now=time.time() t_use=t_now-self.p_start self.p_start=t_now return t_use
ตามด้วยการสั่งจากเชลล์ของไพธอน
import adodb conn = adodb.NewADOConnection("postgres") cur = conn.Connect(host, user, password, db) sql = """CREATE TABLE test (wordid SERIAL, word VARCHAR(255), PRIMARY KEY (wordid))""" cur = conn.Execute(sql)
แบบที่ ๑
def f1(): ttt = TimeIt() for i in range(1000): sql = """INSERT INTO test (word) VALUES ('abc%s')""" % (i,) cur = conn.Execute(sql) sql = """SELECT wordid FROM test WHERE word=%s""" % (conn.qstr(i),) cur = conn.Execute(sql) print ttt.use() sql = """DELETE FROM test""" cur = conn.Execute(sql)
ได้ผลเป็น
>>> f1() 9.27473306656 >>> f1() 9.16922688484 >>> f1() 9.21483206749 >>> f1() 9.20028710365 >>> f1() 10.2529239655 >>> f1() 9.16449689865
แบบที่ ๒
def f2(): ttt = TimeIt() for i in range(1000): sql = """INSERT INTO test (word) VALUES ('abc%s'); \ SELECT wordid FROM test WHERE word=%s""" % (i, conn.qstr(i),) cur = conn.Execute(sql) print ttt.use() sql = """DELETE FROM test""" cur = conn.Execute(sql)
ได้ผลเป็น
>>> f2() 9.11072301865 >>> f2() 9.20462703705 >>> f2() 9.24071407318 >>> f2() 9.25392103195 >>> f2() 9.02831697464 >>> f2() 9.07160282135 >>>
def f3(): def get_id(word): sql = """SELECT wordid FROM test WHERE word=%s""" % (conn.qstr(word),) cur = conn.Execute(sql) return cur.fields[0] def add_word(word): sql = """INSERT INTO test (word) VALUES (%s)""" % (conn.qstr(word),) cur = conn.Execute(sql) return get_id(word) ttt = TimeIt() for i in range(1000): add_word("abc%s" % (i,)) print ttt.use() sql = """DELETE FROM test""" cur = conn.Execute(sql)
ได้ผลเป็น
>>> f3() 9.02051997185 >>> f3() 9.06975603104 >>> f3() 9.30845808983 >>> f3() 9.06503009796 >>> f3() 9.89376401901 >>> f3() 9.28391385078 >>>
def f4(): def add_word(word): sql = """INSERT INTO test (word) VALUES (%s)""" % (conn.qstr(word),) cur = conn.Execute(sql) return cur ttt = TimeIt() for i in range(1000): add_word("abc%s" % (i,)) print ttt.use() sql = """DELETE FROM test""" cur = conn.Execute(sql)
ได้ผลเป็น
>>> f4() 8.93194293976 >>> f4() 8.94240808487 >>> f4() 9.14316105843 >>> f4() 8.93977403641 >>> f4() 9.05243611336 >>> f4() 9.06761908531 >>>
สรุป
ความเร็วแทบไม่ต่างกันเลย ฉะนั้น เขียนให้อ่านง่าย ปรับปรุงง่าย ดีที่สุด
ค้างทดสอบ (รอ postgres-8.2)
def fx(): ttt = TimeIt() for i in range(1000): sql = """INSERT INTO test (word) VALUES ('abc%s') RETURNING \ (SELECT wordid FROM test WHERE word=%s)""" % (i, conn.qstr(i),) cur = conn.Execute(sql) print ttt.use() sql = """DELETE FROM test""" cur = conn.Execute(sql)
RETURNING
มีใช้ใน Oracle กับ Postgres-8.2.3
Executes the equivalent following sql statement:
UPDATE table SET field = blob WHERE whereclause
The blobtype field should be set to either 'BLOB' or 'CLOB'.
Any special encoding required for the blob is applied
transparently.
Loads the binary file filepath into blob. Then
calls UpdateBlob( ).
Note that some databases return more information in each row.
Returns field information from a SELECT statement. The fieldoffset
is zero-based, so to retrieve info on the 1st field use FetchField(0).
A tuple is returned, consisting of:
(name, type_code,display_size, internal_size, precision,
scale,null_ok).
ก่อนอื่น จะสร้างฐานข้อมูลเพื่อใช้เป็นตัวอย่างก่อน
โดยจะสร้างตารางเป็นสมุดโทรศัพท์ ใส่ข้อมูลเบื้องต้นไป 5 แถว
(อย่าเชื่อ syntax มากนะครับ เริ่มหัดใหม่เหมือนกัน)
import adodb driver = 'postgres' host = 'host' user = 'user' password = 'password' db = 'db' conn = adodb.NewADOConnection(driver) cur = conn.Connect(host,user,password,db) sql = """\ CREATE TABLE phone ( pid INT, name VARCHAR(50), phone VARCHAR(50), category VARCHAR(50), update DATE, rem TEXT, img BYTEA)""" cur = conn.Execute(sql) from datetime import datetime today = conn.DBDate(datetime.date(datetime.today())) sql = "INSERT INTO phone \ (pid, name, phone, category, update, rem, img) VALUES " cur=conn.Execute(sql+"(%s, '%s', '%s', '%s', %s, '%s', '%s')" \ % (1, 'name1', '0-0000-0001', 'cat1', today, 'rem1', '')) cur=conn.Execute(sql+"(%s, '%s', '%s', '%s', %s, '%s', '%s')" \ % (2, 'name2', '0-0000-0002', 'cat2', today, 'rem2', '')) cur=conn.Execute(sql+"(%s, '%s', '%s', '%s', %s, '%s', '%s')" \ % (3, 'name3', '0-0000-0003', 'cat3', today, 'rem3', '')) cur=conn.Execute(sql+"(%s, '%s', '%s', '%s', %s, '%s', '%s')" \ % (4, 'name4', '0-0000-0004', 'cat4', today, 'rem4', '')) cur=conn.Execute(sql+"(%s, '%s', '%s', '%s', %s, '%s', '%s')" \ % (5, 'name5', '0-0000-0005', 'cat5', today, 'rem5', ''))
เพื่อไม่ให้เยิ่นเย้อ จะใช้ตัวแปร conn
ซึ่งจะหมายถึง
... conn = adodb.NewADOConnection(driver) ...
เป็นปกติของหน้านี้
>>> cur=conn.SelectLimit('SELECT * FROM phone', 2) (SAME AS) >>> cur=conn.Execute('SELECT * FROM phone LIMIT 2') >>> while not cur.EOF: ... print cur.fields ... cur.MoveNext() ... (1, 'name1', '0-0000-0001', 'cat1', <DateTime object for '2007-01-25 00:00:00.00' at b764b4f0>, 'rem1', '') False (2, 'name2', '0-0000-0002', 'cat2', <DateTime object for '2007-01-25 00:00:00.00' at b764b448>, 'rem2', '') True
>>> try: ... cur=conn.Execute('SELECT * FROM notable') ... except: ... print conn.ErrorMsg() ... ERROR: relation "notable" does not exist
SELECT * FROM notable
>>> conn=adodb.NewADOConnection('postgres') >>> conn.IsConnected() False >>> conn.Connect('host','user','password','database') True >>> conn.IsConnected() True
>>> print conn.qstr(2) '2'
qstr( )
>>> print conn.GetAll('SELECT * FROM phone') [(1, 'name1', '0-0000-0001', 'cat1', <DateTime object for '2007-01-25 00:00:00.00' at b764b4f0>, 'rem1', ''), (2, 'name2', '0-0000-0002', 'cat2', <DateTime object for '2007-01-25 00:00:00.00' at b764b448>, 'rem2', ''), (3, 'name3', '0-0000-0003', 'cat3', <DateTime object for '2007-01-25 00:00:00.00' at b764b410>, 'rem3', ''), (4, 'name4', '0-0000-0004', 'cat4', <DateTime object for '2007-01-25 00:00:00.00' at b764b560>, 'rem4', ''), (5, 'name5', '0-0000-0005', 'cat5', <DateTime object for '2007-01-25 00:00:00.00' at b764b598>, 'rem5', '')]
>>> print conn.GetArray('SELECT * FROM phone') [(1, 'name1', '0-0000-0001', 'cat1', <DateTime object for '2007-01-25 00:00:00.00' at b764b4f0>, 'rem1', ''), (2, 'name2', '0-0000-0002', 'cat2', <DateTime object for '2007-01-25 00:00:00.00' at b764b448>, 'rem2', ''), (3, 'name3', '0-0000-0003', 'cat3', <DateTime object for '2007-01-25 00:00:00.00' at b764b410>, 'rem3', ''), (4, 'name4', '0-0000-0004', 'cat4', <DateTime object for '2007-01-25 00:00:00.00' at b764b560>, 'rem4', ''), (5, 'name5', '0-0000-0005', 'cat5', <DateTime object for '2007-01-25 00:00:00.00' at b764b598>, 'rem5', '')]
>>> print conn.GetRow('SELECT * FROM phone') (1, 'name1', '0-0000-0001', 'cat1', <DateTime object for '2007-01-25 00:00:00.00' at b764b4f0>, 'rem1', '')
>>> print conn.GetOne('SELECT * FROM phone') 1
>>> print conn.GetAssoc('SELECT * FROM phone') {1: ('name1', '0-0000-0001', 'cat1', <DateTime object for '2007-01-25 00:00:00.00' at b764b2f8>, 'rem1', ''), 2: ('name2', '0-0000-0002', 'cat2', <DateTime object for '2007-01-25 00:00:00.00' at b764b528>, 'rem2', ''), 3: ('name3', '0-0000-0003', 'cat3', <DateTime object for '2007-01-25 00:00:00.00' at b764b330>, 'rem3', ''), 4: ('name4', '0-0000-0004', 'cat4', <DateTime object for '2007-01-25 00:00:00.00' at b764b368>, 'rem4', ''), 5: ('name5', '0-0000-0005', 'cat5', <DateTime object for '2007-01-25 00:00:00.00' at b764b4b8>, 'rem5', '')} >>> dict = conn.GetAssoc('SELECT * FROM phone') >>> for i in dict: ... print i, dict[i] ... 1 ('name1', '0-0000-0001', 'cat1', <DateTime object for '2007-01-25 00:00:00.00' at b764b4b8>, 'rem1', '') 2 ('name2', '0-0000-0002', 'cat2', <DateTime object for '2007-01-25 00:00:00.00' at b764b368>, 'rem2', '') 3 ('name3', '0-0000-0003', 'cat3', <DateTime object for '2007-01-25 00:00:00.00' at b764b330>, 'rem3', '') 4 ('name4', '0-0000-0004', 'cat4', <DateTime object for '2007-01-25 00:00:00.00' at b764b528>, 'rem4', '') 5 ('name5', '0-0000-0005', 'cat5', <DateTime object for '2007-01-25 00:00:00.00' at b764b2f8>, 'rem5', '')
GetAssoc()
.>>> print conn.GetCol('SELECT * FROM phone') [1, 2, 3, 4, 5]
>>> print conn.MetaType('B') B >>> print conn.MetaType('BINARY') B >>> print conn.MetaType('date') D >>> print conn.MetaType('varchar') C >>> print conn.MetaType('IMAGE') B
Note that some databases return more information in each row.
>>> print conn.MetaColumns('phone') [('pid', 'int4', 4, -1, 0, 0, 1), ('name', 'varchar', -1, 54, 0, 0, 2), ('phone', 'varchar', -1, 54, 0, 0, 3), ('category', 'varchar', -1, 54, 0, 0, 4), ('update', 'date', 4, -1, 0, 0, 5), ('rem', 'text', -1, -1, 0, 0, 6), ('img', 'bytea', -1, -1, 0, 0, 7)] >>> for line in conn.MetaColumns('phone'): ... print line ... ('pid', 'int4', 4, -1, 0, 0, 1) ('name', 'varchar', -1, 54, 0, 0, 2) ('phone', 'varchar', -1, 54, 0, 0, 3) ('category', 'varchar', -1, 54, 0, 0, 4) ('update', 'date', 4, -1, 0, 0, 5) ('rem', 'text', -1, -1, 0, 0, 6) ('img', 'bytea', -1, -1, 0, 0, 7)
MetaColumns format:
field name, field type, field length, max. field length, is_null, is_serial, field order
>>> from datetime import datetime >>> today = conn.DBDate(datetime.date(datetime.today())) >>> print today '2007-01-25'
>>> now=conn.DBTimeStamp(datetime.datetime.now()) >>> print now '2007-01-25 00:00:00'
>>> print conn.Date('2007-01-25') 2007-01-25 00:00:00
>>> print conn.TimeStamp('2007-01-25 00:00:00') 2007-01-25 00:00:00
>>> dir(conn.Module()) ['BINARY', 'BOOLEAN', 'Binary', 'DATE', 'DATETIME', \ 'DataError', 'DatabaseError', 'Date', 'DateFromMx', \ 'DateFromTicks', 'Error', 'FLOAT', 'INTEGER', 'INTERVAL', \ 'IntegrityError', 'InterfaceError', 'InternalError', \ 'LONGINTEGER', 'NUMBER', 'NotSupportedError', \ 'OperationalError', 'ProgrammingError', 'QuotedString', \ 'ROWID', 'STRING', 'TIME', 'Time', 'TimeFromMx', \ 'TimeFromTicks', 'Timestamp', 'TimestampFromMx', \ 'TimestampFromTicks', 'Warning', '__doc__', '__file__', \ '__name__', '__version__', 'apilevel', 'connect', 'new_type', \ 'paramstyle', 'register_type', 'threadsafety', 'types']
>>> dir(conn.Conn()) ['autocommit', 'close', 'commit', 'cursor', 'cursors', \ 'maxconn', 'minconn', 'rollback', 'serialize', \ 'set_isolation_level']
>>> print conn.DriverInfo() Driver = postgres API Level = 2.0 Param Style = pyformat Thread Safety = 2 (0=none, 1=module, 2=connections, 3=cursors) -------------- None
เพื่อไม่ให้เยิ่นเย้อ จะใช้ตัวแปร cur
ซึ่งจะหมายถึง
... conn = adodb.NewADOConnection(driver) cur = conn.Execute('SELECT * FROM phone') ...
เป็นปกติของส่วนนี้
>>> print cur.RecordCount() 5
RecordCount( )
.>>> cur.MoveNext() False >>> cur.EOF False
>>> a=cur.FetchRow() >>> a (2, 'name2', '0-0000-0002', 'cat2', , 'rem2', '')
>>> print cur.GetRowAssoc() {'CATEGORY': 'cat3', 'NAME': 'name3', 'IMG': '', 'PID': 3, \ 'UPDATE': , 'PHONE': '0-0000-0003', 'REM': 'rem3'}
>>> print cur.FetchField(0) ('pid', 23, 1, 4, None, None, None) >>> cur.FetchField(1) ('name', 1043, 5, 50, None, None, None)
>>> cur.Close()
>>> dir(cur.Cursor()) ['arraysize', 'autocommit', 'callproc', 'close', 'commit', \ 'copy_from', 'copy_to', 'description', 'dictfetchall', \ 'dictfetchmany', 'dictfetchone', 'execute', 'executemany', \ 'fetchall', 'fetchmany', 'fetchone', 'fileno', 'lastoid', \ 'lastrowid', 'nextset', 'notifies', 'rollback', 'rowcount', \ 'scroll', 'setinputsizes', 'setoutputsize', 'statusmessage']
>>> print cur.fields (3, 'name3', '0-0000-0003', 'cat3', , 'rem3', '')
>>> print cur.EOF False
เกร็ด
>>> conn=adodb.NewADOConnection('postgres') >>> conn.Connect('host','user','password','') Traceback (most recent call last): File "<stdin>", line 1, in ? File "/usr/lib/python2.4/site-packages/adodb/adodb.py", line 199, in Connect self._connect(host,user,password,database) File "/usr/lib/python2.4/site-packages/adodb/adodb_postgres.py", line 46, in _connect self._conn = psycopg.connect(dsn) psycopg.OperationalError: FATAL: database "user" does not exist
แต่เราสามารถทำโดยอ้อมได้ มีข้อแม้ว่าต้องมี database อยู่ก่อน ก็คือเราติดต่อเข้าไปหา database ที่มีอยู่ ในที่นี้เราใช้ postgres ตัว database ที่มีอยู่แน่นอนก็คือ database ชื่อ postgres แล้วจึงค่อยสร้างใหม่จากคำสั่ง Execute SQL
>>> conn=adodb.NewADOConnection('postgres') >>> conn.Connect('host','user','password','postgres') True >>> cur=conn.Execute('CREATE DATABASE %s OWNER %s' % ('new_db','user')) >>> conn.Connect('host','user','password','new_db') True
######################################################################## # Vers 2.01 5 May 2006, (c)2004-2006 John Lim (jlim#natsoft.com.my) All Rights Reserved # Released under a BSD-style license. See LICENSE.txt. # Download: http://adodb.sourceforge.net/#pydownload ######################################################################## __author__ = "John Lim (jlim#natsoft.com)" __credits__ = "(c) 2004-2006 John Lim" import exceptions,sys,re from datetime import datetime try: True, False except NameError: # Maintain compatibility with Python 2.2 True, False = 1, 0 MapTypes = { 'VARCHAR' : 'C', 'VARCHAR2' : 'C', 'CHAR' : 'C', 'C' : 'C', 'STRING' : 'C', 'NCHAR' : 'C', 'NVARCHAR' : 'C', 'VARYING' : 'C', 'BPCHAR' : 'C', 'CHARACTER' : 'C', 'INTERVAL' : 'C', # Postgres ## 'LONGCHAR' : 'X', 'TEXT' : 'X', 'NTEXT' : 'X', 'M' : 'X', 'X' : 'X', 'CLOB' : 'X', 'NCLOB' : 'X', 'LVARCHAR' : 'X', ## 'BLOB' : 'B', 'IMAGE' : 'B', 'BINARY' : 'B', 'VARBINARY' : 'B', 'LONGBINARY' : 'B', 'B' : 'B', ## 'YEAR' : 'D', # mysql 'DATE' : 'D', 'D' : 'D', ## 'TIME' : 'T', 'TIMESTAMP' : 'T', 'DATETIME' : 'T', 'TIMESTAMPTZ' : 'T', 'T' : 'T', ## 'BOOL' : 'L', 'BOOLEAN' : 'L', 'BIT' : 'L', 'L' : 'L', ## 'COUNTER' : 'R', 'R' : 'R', 'SERIAL' : 'R', # ifx 'INT IDENTITY' : 'R', ## 'INT' : 'I', 'INTEGER' : 'I', 'INTEGER UNSIGNED' : 'I', 'SHORT' : 'I', 'TINYINT' : 'I', 'SMALLINT' : 'I', 'I' : 'I', ## 'LONG' : 'N', # interbase is numeric, oci8 is blob 'BIGINT' : 'N', # this is bigger than PHP 32-bit integers 'DECIMAL' : 'N', 'DEC' : 'N', 'REAL' : 'N', 'DOUBLE' : 'N', 'DOUBLE PRECISION' : 'N', 'SMALLFLOAT' : 'N', 'FLOAT' : 'N', 'NUMBER' : 'N', 'NUM' : 'N', 'NUMERIC' : 'N', 'MONEY' : 'N', ## informix 9.2 'SQLINT' : 'I', 'SQLSERIAL' : 'I', 'SQLSMINT' : 'I', 'SQLSMFLOAT' : 'N', 'SQLFLOAT' : 'N', 'SQLMONEY' : 'N', 'SQLDECIMAL' : 'N', 'SQLDATE' : 'D', 'SQLVCHAR' : 'C', 'SQLCHAR' : 'C', 'SQLDTIME' : 'T', 'SQLINTERVAL' : 'N', 'SQLBYTES' : 'B', 'SQLTEXT' : 'X'} class adodb_iter: def __iter__(self): def next(self): def NewADOConnection(modulename): def ADONewConnection(modulename): class ADOConnection: databaseType = None dataProvider = 'native' host = None user = None password = None database = None replaceQuote = "\\'" useExceptions = True debug = None getLOBs = True hasRowCount = True metaColSQL = 'Invalid' fmtDate = '%Y-%m-%d' fmtTimeStamp = '%Y-%m-%d %H:%M:%S' _errormsg = '' _errno = 0 _conn = None _autocommit = True _connected = True def __init__(self): pass def Connect(self,host=None,user=None,password=None,database=None): def IsConnected(self): def DriverInfo(self): def ErrorMsg(self): def ErrorNo(self): def qstr(self,s): def quote(self,s): def addq(self,s): def Conn(self): def _query(self,sql,params=None,_cursor=None): def SelectLimit(self,sql,limit,offset=-1,params=None): def Execute(self,sql,params=None): def UpdateBlob(self,table,field,blob,where,blobtype='BLOB'): def UpdateBlobFile(self,table,field,filepath,where,blobtype='BLOB'): def UpdateClob(self,table,field,blob,where): def GetRows(self,sql,params=None): def GetArray(self,sql,params=None): def GetAll(self,sql,params=None): def GetRow(self,sql,params=None): def GetRow(self,sql,params=None): def GetOne(self,sql,params=None): def GetCol(self, sql, params=None): def GetAssoc(self, sql, params=None): def GetDict(self, sql, params=None): def BeginTrans(self): def CommitTrans(self): def RollbackTrans(self): def Close(self): def DBDate(self,d): def DBTimeStamp(self,d): def Date(self,s): def TimeStamp(self,s): def MetaType(self, dbtype): def MetaColumns(self, table): class ADOCursor: _cursor = None fields = None EOF = False _rowcount = 0 _isselect = False _insertid = 0 _conn = None def __init__(self,rs,conn,norowcount=False): def __iter__(self): def RecordCount(self): def MoveNext(self): def FetchRow(self): # returns a tuple of the form (name, type_code,display_size, internal_size, precision, scale,null_ok) # note: databases could return name in upper or lower-case def FetchField(self,row): def Affected_Rows(self): def Insert_ID(self): def Cursor(self): def GetRowAssoc(self,upper=1): def Close(self): #=========================================================== # UNIT TESTING #=========================================================== def _Test_Eq(testid, correct, testval, errmsg=''): if correct == testval: print "Passed Test: "+testid else: print "" print "********* Failed Test: "+testid print "********************** "+str(errmsg) print "********************** expected="+str(correct) print "********************** actual="+str(testval) def Test_Blob(db): import os src = 'c:/lensserver.gif' dest = 'c:/testpy1.gif' try: os.unlink(dest) except: pass saved = db.debug saveb = db.getLOBs db.debug = True db.getLOBs = True db.UpdateBlobFile('photos','photo',src,'id=1') data = db.GetOne('select photo from photos where id=1') f = file(dest,'wb') f.write(data) f.close() rs = db.Execute('select * from photos') while not rs.EOF: print 'Fields=',rs.fields rs.MoveNext() print "=======================" rows = db.GetAll('select * from photos where id<=1') print rows db.getLOBs = saveb db.debug = saved def Test(db,debug=False): db.DriverInfo() if False: d = db.Date('2004-03-21') print '2004-03-21=',d d = db.TimeStamp('2004-03-22 12:50:51') print '2004-03-22 12:50:51=',d print "DBTimeStamp=", db.DBTimeStamp(d) db.useExceptions = True # use adodb error handling try: sql = 'select * from xadoxyz where 0 < id and id < 3' rs = db.Execute(sql) _Test_Eq('Bad SQL',None, rs, sql) except: print "And you should see an error message indicating bad table was defined: " print "err=",db.ErrorMsg() print "-----" rs = db.Execute('select * from ADOXYZ where 0 < id and id < 3 order by id') while not rs.EOF: print rs.fields rs.MoveNext() print "You should see 2 rows of data here:" rs = db.Execute('select * from adoxyz where 0 < id and id < 3 order by id') print "rows=",rs.RecordCount() while (not rs.EOF): print rs.GetRowAssoc() rs.MoveNext() print "-----" rs = db.Execute('select id,firstname from adoxyz where 0 < id and id < 3 order by id') _Test_Eq("Test FetchField",'FIRSTNAME',rs.FetchField(1)[0].upper()) if (debug): print rs.FetchField(1) cnt = 0 while 1: arr=rs.FetchRow() if arr == None: break cnt += 1 _Test_Eq('Execute 2.0',cnt,arr[0]) _Test_Eq('Execute 2.1',2,cnt) if rs.RecordCount() == -1: print "*** RecordCount not supported: -1" else: _Test_Eq('Execute 2.1 RecordCount',2,rs.RecordCount()) rs = db.Execute("delete from adoxyz where id=997") cnt = rs.Affected_Rows() _Test_Eq('Affected_Rows',1,cnt) ok = db.Execute("insert into adoxyz (id, firstname,lastname) values (997,'python','snake')") if not ok: _Test_Eq('DELETE/INSERT','inserted row','failed insert') row = db.GetRow("select id,firstname from adoxyz where id=997"); _Test_Eq('GetRow',str(997)+' '+'python',str(int(row[0]))+' '+row[1].rstrip(),row) row = db.GetOne("select id,firstname from adoxyz where id=997"); _Test_Eq('GetOne',997,row) rs = db.SelectLimit("select id,firstname from adoxyz",3) cnt = 0 try: for row in rs: cnt += 1 #print rs.fields _Test_Eq('SelectLimit',3,cnt) except: print "Failed Iteration" print sys.exc_info()[1] d = db.GetOne('select created from adoxyz where id=1') d2 = db.TimeStamp(d) _Test_Eq('DBDate',str(d)[:19],str(d2)) if (db.qstr("\\show'boat") != "'\\\\show\\'boat'" and db.qstr("\\show'boat") != "'\\show''boat'"): _Test_Eq('qstr',"qstr(\\show'boat)", db.qstr("\\show'boat")) else: _Test_Eq('qstr','1','1') try: db.debug=True print "Testing GetAssoc" arr = db.GetAssoc('select firstname,lastname from adoxyz') print arr print "Testing GetCol" arr = db.GetCol('select firstname from adoxyz') print arr except: print sys.exc_info()[1] try: print "MetaColumns:" rows = db.MetaColumns('adoxyz') print rows except: print "Failed MetaColumns" print sys.exc_info()[1] try: db.BeginTrans() ok = db.Execute("insert into adoxyz (id, firstname,lastname) values (1997,'python','snake')") db.RollbackTrans() val = db.GetOne('select * from adoxyz where id=1997') _Test_Eq('Rollback Test',None,val) except: print "Failed Rollback Test" print sys.exc_info()[1]