python: Hack Adodb
Submitted by wd on Wed, 2007-01-24 12:31
เกร็ด
- ปกติเราไม่สามารถสร้าง database จากคำสั่ง connection ได้ เช่น
>>> conn=adodb.NewADOConnection('postgres') >>> conn.Connect('host','user','password','') Traceback (most recent call last): File "<stdin>", line 1, in ? File "/usr/lib/python2.4/site-packages/adodb/adodb.py", line 199, in Connect self._connect(host,user,password,database) File "/usr/lib/python2.4/site-packages/adodb/adodb_postgres.py", line 46, in _connect self._conn = psycopg.connect(dsn) psycopg.OperationalError: FATAL: database "user" does not exist
แต่เราสามารถทำโดยอ้อมได้ มีข้อแม้ว่าต้องมี database อยู่ก่อน ก็คือเราติดต่อเข้าไปหา database ที่มีอยู่ ในที่นี้เราใช้ postgres ตัว database ที่มีอยู่แน่นอนก็คือ database ชื่อ postgres แล้วจึงค่อยสร้างใหม่จากคำสั่ง Execute SQL
>>> conn=adodb.NewADOConnection('postgres') >>> conn.Connect('host','user','password','postgres') True >>> cur=conn.Execute('CREATE DATABASE %s OWNER %s' % ('new_db','user')) >>> conn.Connect('host','user','password','new_db') True
โค๊ดของโปรแกรม adodb.py
ยกโค๊ดมาดูทั้งยวง ตัดทอนเหลือเฉพาะชื่อคลาสและฟังก์ชั่น
######################################################################## # Vers 2.01 5 May 2006, (c)2004-2006 John Lim (jlim#natsoft.com.my) All Rights Reserved # Released under a BSD-style license. See LICENSE.txt. # Download: http://adodb.sourceforge.net/#pydownload ######################################################################## __author__ = "John Lim (jlim#natsoft.com)" __credits__ = "(c) 2004-2006 John Lim" import exceptions,sys,re from datetime import datetime try: True, False except NameError: # Maintain compatibility with Python 2.2 True, False = 1, 0 MapTypes = { 'VARCHAR' : 'C', 'VARCHAR2' : 'C', 'CHAR' : 'C', 'C' : 'C', 'STRING' : 'C', 'NCHAR' : 'C', 'NVARCHAR' : 'C', 'VARYING' : 'C', 'BPCHAR' : 'C', 'CHARACTER' : 'C', 'INTERVAL' : 'C', # Postgres ## 'LONGCHAR' : 'X', 'TEXT' : 'X', 'NTEXT' : 'X', 'M' : 'X', 'X' : 'X', 'CLOB' : 'X', 'NCLOB' : 'X', 'LVARCHAR' : 'X', ## 'BLOB' : 'B', 'IMAGE' : 'B', 'BINARY' : 'B', 'VARBINARY' : 'B', 'LONGBINARY' : 'B', 'B' : 'B', ## 'YEAR' : 'D', # mysql 'DATE' : 'D', 'D' : 'D', ## 'TIME' : 'T', 'TIMESTAMP' : 'T', 'DATETIME' : 'T', 'TIMESTAMPTZ' : 'T', 'T' : 'T', ## 'BOOL' : 'L', 'BOOLEAN' : 'L', 'BIT' : 'L', 'L' : 'L', ## 'COUNTER' : 'R', 'R' : 'R', 'SERIAL' : 'R', # ifx 'INT IDENTITY' : 'R', ## 'INT' : 'I', 'INTEGER' : 'I', 'INTEGER UNSIGNED' : 'I', 'SHORT' : 'I', 'TINYINT' : 'I', 'SMALLINT' : 'I', 'I' : 'I', ## 'LONG' : 'N', # interbase is numeric, oci8 is blob 'BIGINT' : 'N', # this is bigger than PHP 32-bit integers 'DECIMAL' : 'N', 'DEC' : 'N', 'REAL' : 'N', 'DOUBLE' : 'N', 'DOUBLE PRECISION' : 'N', 'SMALLFLOAT' : 'N', 'FLOAT' : 'N', 'NUMBER' : 'N', 'NUM' : 'N', 'NUMERIC' : 'N', 'MONEY' : 'N', ## informix 9.2 'SQLINT' : 'I', 'SQLSERIAL' : 'I', 'SQLSMINT' : 'I', 'SQLSMFLOAT' : 'N', 'SQLFLOAT' : 'N', 'SQLMONEY' : 'N', 'SQLDECIMAL' : 'N', 'SQLDATE' : 'D', 'SQLVCHAR' : 'C', 'SQLCHAR' : 'C', 'SQLDTIME' : 'T', 'SQLINTERVAL' : 'N', 'SQLBYTES' : 'B', 'SQLTEXT' : 'X'} class adodb_iter: def __iter__(self): def next(self): def NewADOConnection(modulename): def ADONewConnection(modulename): class ADOConnection: databaseType = None dataProvider = 'native' host = None user = None password = None database = None replaceQuote = "\\'" useExceptions = True debug = None getLOBs = True hasRowCount = True metaColSQL = 'Invalid' fmtDate = '%Y-%m-%d' fmtTimeStamp = '%Y-%m-%d %H:%M:%S' _errormsg = '' _errno = 0 _conn = None _autocommit = True _connected = True def __init__(self): pass def Connect(self,host=None,user=None,password=None,database=None): def IsConnected(self): def DriverInfo(self): def ErrorMsg(self): def ErrorNo(self): def qstr(self,s): def quote(self,s): def addq(self,s): def Conn(self): def _query(self,sql,params=None,_cursor=None): def SelectLimit(self,sql,limit,offset=-1,params=None): def Execute(self,sql,params=None): def UpdateBlob(self,table,field,blob,where,blobtype='BLOB'): def UpdateBlobFile(self,table,field,filepath,where,blobtype='BLOB'): def UpdateClob(self,table,field,blob,where): def GetRows(self,sql,params=None): def GetArray(self,sql,params=None): def GetAll(self,sql,params=None): def GetRow(self,sql,params=None): def GetRow(self,sql,params=None): def GetOne(self,sql,params=None): def GetCol(self, sql, params=None): def GetAssoc(self, sql, params=None): def GetDict(self, sql, params=None): def BeginTrans(self): def CommitTrans(self): def RollbackTrans(self): def Close(self): def DBDate(self,d): def DBTimeStamp(self,d): def Date(self,s): def TimeStamp(self,s): def MetaType(self, dbtype): def MetaColumns(self, table): class ADOCursor: _cursor = None fields = None EOF = False _rowcount = 0 _isselect = False _insertid = 0 _conn = None def __init__(self,rs,conn,norowcount=False): def __iter__(self): def RecordCount(self): def MoveNext(self): def FetchRow(self): # returns a tuple of the form (name, type_code,display_size, internal_size, precision, scale,null_ok) # note: databases could return name in upper or lower-case def FetchField(self,row): def Affected_Rows(self): def Insert_ID(self): def Cursor(self): def GetRowAssoc(self,upper=1): def Close(self): #=========================================================== # UNIT TESTING #=========================================================== def _Test_Eq(testid, correct, testval, errmsg=''): if correct == testval: print "Passed Test: "+testid else: print "" print "********* Failed Test: "+testid print "********************** "+str(errmsg) print "********************** expected="+str(correct) print "********************** actual="+str(testval) def Test_Blob(db): import os src = 'c:/lensserver.gif' dest = 'c:/testpy1.gif' try: os.unlink(dest) except: pass saved = db.debug saveb = db.getLOBs db.debug = True db.getLOBs = True db.UpdateBlobFile('photos','photo',src,'id=1') data = db.GetOne('select photo from photos where id=1') f = file(dest,'wb') f.write(data) f.close() rs = db.Execute('select * from photos') while not rs.EOF: print 'Fields=',rs.fields rs.MoveNext() print "=======================" rows = db.GetAll('select * from photos where id<=1') print rows db.getLOBs = saveb db.debug = saved def Test(db,debug=False): db.DriverInfo() if False: d = db.Date('2004-03-21') print '2004-03-21=',d d = db.TimeStamp('2004-03-22 12:50:51') print '2004-03-22 12:50:51=',d print "DBTimeStamp=", db.DBTimeStamp(d) db.useExceptions = True # use adodb error handling try: sql = 'select * from xadoxyz where 0 < id and id < 3' rs = db.Execute(sql) _Test_Eq('Bad SQL',None, rs, sql) except: print "And you should see an error message indicating bad table was defined: " print "err=",db.ErrorMsg() print "-----" rs = db.Execute('select * from ADOXYZ where 0 < id and id < 3 order by id') while not rs.EOF: print rs.fields rs.MoveNext() print "You should see 2 rows of data here:" rs = db.Execute('select * from adoxyz where 0 < id and id < 3 order by id') print "rows=",rs.RecordCount() while (not rs.EOF): print rs.GetRowAssoc() rs.MoveNext() print "-----" rs = db.Execute('select id,firstname from adoxyz where 0 < id and id < 3 order by id') _Test_Eq("Test FetchField",'FIRSTNAME',rs.FetchField(1)[0].upper()) if (debug): print rs.FetchField(1) cnt = 0 while 1: arr=rs.FetchRow() if arr == None: break cnt += 1 _Test_Eq('Execute 2.0',cnt,arr[0]) _Test_Eq('Execute 2.1',2,cnt) if rs.RecordCount() == -1: print "*** RecordCount not supported: -1" else: _Test_Eq('Execute 2.1 RecordCount',2,rs.RecordCount()) rs = db.Execute("delete from adoxyz where id=997") cnt = rs.Affected_Rows() _Test_Eq('Affected_Rows',1,cnt) ok = db.Execute("insert into adoxyz (id, firstname,lastname) values (997,'python','snake')") if not ok: _Test_Eq('DELETE/INSERT','inserted row','failed insert') row = db.GetRow("select id,firstname from adoxyz where id=997"); _Test_Eq('GetRow',str(997)+' '+'python',str(int(row[0]))+' '+row[1].rstrip(),row) row = db.GetOne("select id,firstname from adoxyz where id=997"); _Test_Eq('GetOne',997,row) rs = db.SelectLimit("select id,firstname from adoxyz",3) cnt = 0 try: for row in rs: cnt += 1 #print rs.fields _Test_Eq('SelectLimit',3,cnt) except: print "Failed Iteration" print sys.exc_info()[1] d = db.GetOne('select created from adoxyz where id=1') d2 = db.TimeStamp(d) _Test_Eq('DBDate',str(d)[:19],str(d2)) if (db.qstr("\\show'boat") != "'\\\\show\\'boat'" and db.qstr("\\show'boat") != "'\\show''boat'"): _Test_Eq('qstr',"qstr(\\show'boat)", db.qstr("\\show'boat")) else: _Test_Eq('qstr','1','1') try: db.debug=True print "Testing GetAssoc" arr = db.GetAssoc('select firstname,lastname from adoxyz') print arr print "Testing GetCol" arr = db.GetCol('select firstname from adoxyz') print arr except: print sys.exc_info()[1] try: print "MetaColumns:" rows = db.MetaColumns('adoxyz') print rows except: print "Failed MetaColumns" print sys.exc_info()[1] try: db.BeginTrans() ok = db.Execute("insert into adoxyz (id, firstname,lastname) values (1997,'python','snake')") db.RollbackTrans() val = db.GetOne('select * from adoxyz where id=1997') _Test_Eq('Rollback Test',None,val) except: print "Failed Rollback Test" print sys.exc_info()[1]
- Printer-friendly version
- Log in or register to post comments
- 7405 reads
Recent comments