เกร็ด
>>> conn=adodb.NewADOConnection('postgres')
>>> conn.Connect('host','user','password','')
Traceback (most recent call last):
File "<stdin>", line 1, in ?
File "/usr/lib/python2.4/site-packages/adodb/adodb.py", line 199, in Connect
self._connect(host,user,password,database)
File "/usr/lib/python2.4/site-packages/adodb/adodb_postgres.py", line 46, in _connect
self._conn = psycopg.connect(dsn)
psycopg.OperationalError: FATAL: database "user" does not existแต่เราสามารถทำโดยอ้อมได้ มีข้อแม้ว่าต้องมี database อยู่ก่อน ก็คือเราติดต่อเข้าไปหา database ที่มีอยู่ ในที่นี้เราใช้ postgres ตัว database ที่มีอยู่แน่นอนก็คือ database ชื่อ postgres แล้วจึงค่อยสร้างใหม่จากคำสั่ง Execute SQL
>>> conn=adodb.NewADOConnection('postgres')
>>> conn.Connect('host','user','password','postgres')
True
>>> cur=conn.Execute('CREATE DATABASE %s OWNER %s' % ('new_db','user'))
>>> conn.Connect('host','user','password','new_db')
True########################################################################
# Vers 2.01 5 May 2006, (c)2004-2006 John Lim (jlim#natsoft.com.my) All Rights Reserved
# Released under a BSD-style license. See LICENSE.txt.
# Download: http://adodb.sourceforge.net/#pydownload
########################################################################
__author__ = "John Lim (jlim#natsoft.com)"
__credits__ = "(c) 2004-2006 John Lim"
import exceptions,sys,re
from datetime import datetime
try:
True, False
except NameError:
# Maintain compatibility with Python 2.2
True, False = 1, 0
MapTypes = {
'VARCHAR' : 'C',
'VARCHAR2' : 'C',
'CHAR' : 'C',
'C' : 'C',
'STRING' : 'C',
'NCHAR' : 'C',
'NVARCHAR' : 'C',
'VARYING' : 'C',
'BPCHAR' : 'C',
'CHARACTER' : 'C',
'INTERVAL' : 'C', # Postgres
##
'LONGCHAR' : 'X',
'TEXT' : 'X',
'NTEXT' : 'X',
'M' : 'X',
'X' : 'X',
'CLOB' : 'X',
'NCLOB' : 'X',
'LVARCHAR' : 'X',
##
'BLOB' : 'B',
'IMAGE' : 'B',
'BINARY' : 'B',
'VARBINARY' : 'B',
'LONGBINARY' : 'B',
'B' : 'B',
##
'YEAR' : 'D', # mysql
'DATE' : 'D',
'D' : 'D',
##
'TIME' : 'T',
'TIMESTAMP' : 'T',
'DATETIME' : 'T',
'TIMESTAMPTZ' : 'T',
'T' : 'T',
##
'BOOL' : 'L',
'BOOLEAN' : 'L',
'BIT' : 'L',
'L' : 'L',
##
'COUNTER' : 'R',
'R' : 'R',
'SERIAL' : 'R', # ifx
'INT IDENTITY' : 'R',
##
'INT' : 'I',
'INTEGER' : 'I',
'INTEGER UNSIGNED' : 'I',
'SHORT' : 'I',
'TINYINT' : 'I',
'SMALLINT' : 'I',
'I' : 'I',
##
'LONG' : 'N', # interbase is numeric, oci8 is blob
'BIGINT' : 'N', # this is bigger than PHP 32-bit integers
'DECIMAL' : 'N',
'DEC' : 'N',
'REAL' : 'N',
'DOUBLE' : 'N',
'DOUBLE PRECISION' : 'N',
'SMALLFLOAT' : 'N',
'FLOAT' : 'N',
'NUMBER' : 'N',
'NUM' : 'N',
'NUMERIC' : 'N',
'MONEY' : 'N',
## informix 9.2
'SQLINT' : 'I',
'SQLSERIAL' : 'I',
'SQLSMINT' : 'I',
'SQLSMFLOAT' : 'N',
'SQLFLOAT' : 'N',
'SQLMONEY' : 'N',
'SQLDECIMAL' : 'N',
'SQLDATE' : 'D',
'SQLVCHAR' : 'C',
'SQLCHAR' : 'C',
'SQLDTIME' : 'T',
'SQLINTERVAL' : 'N',
'SQLBYTES' : 'B',
'SQLTEXT' : 'X'}
class adodb_iter:
def __iter__(self):
def next(self):
def NewADOConnection(modulename):
def ADONewConnection(modulename):
class ADOConnection:
databaseType = None
dataProvider = 'native'
host = None
user = None
password = None
database = None
replaceQuote = "\\'"
useExceptions = True
debug = None
getLOBs = True
hasRowCount = True
metaColSQL = 'Invalid'
fmtDate = '%Y-%m-%d'
fmtTimeStamp = '%Y-%m-%d %H:%M:%S'
_errormsg = ''
_errno = 0
_conn = None
_autocommit = True
_connected = True
def __init__(self):
pass
def Connect(self,host=None,user=None,password=None,database=None):
def IsConnected(self):
def DriverInfo(self):
def ErrorMsg(self):
def ErrorNo(self):
def qstr(self,s):
def quote(self,s):
def addq(self,s):
def Conn(self):
def _query(self,sql,params=None,_cursor=None):
def SelectLimit(self,sql,limit,offset=-1,params=None):
def Execute(self,sql,params=None):
def UpdateBlob(self,table,field,blob,where,blobtype='BLOB'):
def UpdateBlobFile(self,table,field,filepath,where,blobtype='BLOB'):
def UpdateClob(self,table,field,blob,where):
def GetRows(self,sql,params=None):
def GetArray(self,sql,params=None):
def GetAll(self,sql,params=None):
def GetRow(self,sql,params=None):
def GetRow(self,sql,params=None):
def GetOne(self,sql,params=None):
def GetCol(self, sql, params=None):
def GetAssoc(self, sql, params=None):
def GetDict(self, sql, params=None):
def BeginTrans(self):
def CommitTrans(self):
def RollbackTrans(self):
def Close(self):
def DBDate(self,d):
def DBTimeStamp(self,d):
def Date(self,s):
def TimeStamp(self,s):
def MetaType(self, dbtype):
def MetaColumns(self, table):
class ADOCursor:
_cursor = None
fields = None
EOF = False
_rowcount = 0
_isselect = False
_insertid = 0
_conn = None
def __init__(self,rs,conn,norowcount=False):
def __iter__(self):
def RecordCount(self):
def MoveNext(self):
def FetchRow(self):
# returns a tuple of the form (name, type_code,display_size, internal_size, precision, scale,null_ok)
# note: databases could return name in upper or lower-case
def FetchField(self,row):
def Affected_Rows(self):
def Insert_ID(self):
def Cursor(self):
def GetRowAssoc(self,upper=1):
def Close(self):
#===========================================================
# UNIT TESTING
#===========================================================
def _Test_Eq(testid, correct, testval, errmsg=''):
if correct == testval:
print "Passed Test: "+testid
else:
print ""
print "********* Failed Test: "+testid
print "********************** "+str(errmsg)
print "********************** expected="+str(correct)
print "********************** actual="+str(testval)
def Test_Blob(db):
import os
src = 'c:/lensserver.gif'
dest = 'c:/testpy1.gif'
try:
os.unlink(dest)
except:
pass
saved = db.debug
saveb = db.getLOBs
db.debug = True
db.getLOBs = True
db.UpdateBlobFile('photos','photo',src,'id=1')
data = db.GetOne('select photo from photos where id=1')
f = file(dest,'wb')
f.write(data)
f.close()
rs = db.Execute('select * from photos')
while not rs.EOF:
print 'Fields=',rs.fields
rs.MoveNext()
print "======================="
rows = db.GetAll('select * from photos where id<=1')
print rows
db.getLOBs = saveb
db.debug = saved
def Test(db,debug=False):
db.DriverInfo()
if False:
d = db.Date('2004-03-21')
print '2004-03-21=',d
d = db.TimeStamp('2004-03-22 12:50:51')
print '2004-03-22 12:50:51=',d
print "DBTimeStamp=", db.DBTimeStamp(d)
db.useExceptions = True # use adodb error handling
try:
sql = 'select * from xadoxyz where 0 < id and id < 3'
rs = db.Execute(sql)
_Test_Eq('Bad SQL',None, rs, sql)
except:
print "And you should see an error message indicating bad table was defined: "
print "err=",db.ErrorMsg()
print "-----"
rs = db.Execute('select * from ADOXYZ where 0 < id and id < 3 order by id')
while not rs.EOF:
print rs.fields
rs.MoveNext()
print "You should see 2 rows of data here:"
rs = db.Execute('select * from adoxyz where 0 < id and id < 3 order by id')
print "rows=",rs.RecordCount()
while (not rs.EOF):
print rs.GetRowAssoc()
rs.MoveNext()
print "-----"
rs = db.Execute('select id,firstname from adoxyz where 0 < id and id < 3 order by id')
_Test_Eq("Test FetchField",'FIRSTNAME',rs.FetchField(1)[0].upper())
if (debug): print rs.FetchField(1)
cnt = 0
while 1:
arr=rs.FetchRow()
if arr == None: break
cnt += 1
_Test_Eq('Execute 2.0',cnt,arr[0])
_Test_Eq('Execute 2.1',2,cnt)
if rs.RecordCount() == -1: print "*** RecordCount not supported: -1"
else: _Test_Eq('Execute 2.1 RecordCount',2,rs.RecordCount())
rs = db.Execute("delete from adoxyz where id=997")
cnt = rs.Affected_Rows()
_Test_Eq('Affected_Rows',1,cnt)
ok = db.Execute("insert into adoxyz (id, firstname,lastname) values (997,'python','snake')")
if not ok: _Test_Eq('DELETE/INSERT','inserted row','failed insert')
row = db.GetRow("select id,firstname from adoxyz where id=997");
_Test_Eq('GetRow',str(997)+' '+'python',str(int(row[0]))+' '+row[1].rstrip(),row)
row = db.GetOne("select id,firstname from adoxyz where id=997");
_Test_Eq('GetOne',997,row)
rs = db.SelectLimit("select id,firstname from adoxyz",3)
cnt = 0
try:
for row in rs:
cnt += 1
#print rs.fields
_Test_Eq('SelectLimit',3,cnt)
except:
print "Failed Iteration"
print sys.exc_info()[1]
d = db.GetOne('select created from adoxyz where id=1')
d2 = db.TimeStamp(d)
_Test_Eq('DBDate',str(d)[:19],str(d2))
if (db.qstr("\\show'boat") != "'\\\\show\\'boat'" and db.qstr("\\show'boat") != "'\\show''boat'"):
_Test_Eq('qstr',"qstr(\\show'boat)", db.qstr("\\show'boat"))
else:
_Test_Eq('qstr','1','1')
try:
db.debug=True
print "Testing GetAssoc"
arr = db.GetAssoc('select firstname,lastname from adoxyz')
print arr
print "Testing GetCol"
arr = db.GetCol('select firstname from adoxyz')
print arr
except:
print sys.exc_info()[1]
try:
print "MetaColumns:"
rows = db.MetaColumns('adoxyz')
print rows
except:
print "Failed MetaColumns"
print sys.exc_info()[1]
try:
db.BeginTrans()
ok = db.Execute("insert into adoxyz (id, firstname,lastname) values (1997,'python','snake')")
db.RollbackTrans()
val = db.GetOne('select * from adoxyz where id=1997')
_Test_Eq('Rollback Test',None,val)
except:
print "Failed Rollback Test"
print sys.exc_info()[1]