# Jython Database Specification API 2.0 # # $Id: dbextstest.py 2101 2002-05-10 16:11:41Z bzimmer $ # # Copyright (c) 2001 brian zimmer import dbexts, runner, tempfile, os from random import random class dbextsTestCase(runner.SQLTestCase): def setUp(self): template = """ [default] name=dbexts_test [jdbc] name=dbexts_test url=%s user=%s pwd=%s driver=%s """ args = {} for arg in self.factory.arguments: args[arg[0]] = arg[1] template = template % (args["url"], args["usr"], args["pwd"], args["driver"]) if hasattr(self, "datahandler"): template += "\tdatahandler=%s" % (self.datahandler.__name__) template = os.linesep.join(template.split()) try: fp = open(tempfile.mktemp(), "w") fp.write(template) fp.close() self.db = dbexts.dbexts(cfg=fp.name) self.db.verbose = 0 for table in ("one", "two"): try: self.db.raw("drop table %s" % (table)) except: pass self.db.raw("create table one (a int, b int, c varchar(32))") self.db.raw("create table two (a int, b int, c varchar(32))") finally: try: os.remove(fp.name) except: pass def tearDown(self): self.db.raw("drop table one") self.db.raw("drop table two") self.db.close() def testChoose(self): """testing choose()""" r = dbexts.choose(1, 4, 5) assert r == 4, "choose failed, expected 4, got %d" % r def _insertInto(self, table, num): for i in range(0, num): self.db.raw("insert into %s (a, b, c) values (?, ?, ?)" % (table), [(i, random()*100+i, "%s" % (random()*100+i))]) def testSqlFailure(self): """testing isql with sql exception""" try: self.db.isql("select * from __garbage__") self.fail("expected SQL exception") except: pass def testSqlFailureWithBeginCommit(self): """testing failure with begin/commit""" failed = 0 c = self.db.begin() try: try: c.execute("select * from __garbage__") except: failed = 1 finally: self.db.commit(c) if not failed: self.fail("expected SQL exception") def testSqlWithBeginCommit(self): """testing begin/commit""" self._insertInto("two", 30) c = self.db.begin() c.execute("select * from two") f = c.fetchall() c.close() self.db.commit() assert len(f) == 30, "expected [30], got [%d]" % (len(f)) def testQueryMultipleReturnSets(self): """testing multiple return sets""" self._insertInto("two", 30) h, r = self.db.raw("select * from two where a = ?", [(0,), (3,)]) assert len(r) == 2, "expected [2], got [%d]" % (len(r)) def testUpdateCount(self): """testing update count""" self._insertInto("one", 45) self.db.raw("delete from one where a > ?", [(12,)]) self.assertEquals(32, self.db.updatecount) def testQueryWithMaxRows(self): """testing query with max rows""" self._insertInto("one", 45) self.db.raw("select * from one", maxrows=3) self.assertEquals(3, len(self.db.results)) self.db.raw("select * from one where a > ?", [(12,)], maxrows=3) self.assertEquals(3, len(self.db.results)) def testBulkcopy(self): """testing bcp""" self._insertInto("two", 3) bcp = self.db.bulkcopy("dbexts_test", "two", include=['a']) assert len(bcp.columns) == 1, "one column should be specified, [%d] found" % (len(bcp.columns)) bcp = self.db.bulkcopy("dbexts_test", "two", include=['a', 'b', 'c'], exclude=['a']) assert len(bcp.columns) == 2, "expected two columns, found [%d]" % (len(bcp.columns)) a = filter(lambda x, c=bcp.columns: x in c, ['b', 'c']) assert a, "expecting ['b', 'c'], found %s" % (str(a)) class _executor: def __init__(self, table, cols): self.cols = cols if cols: self.sql = "insert into %s (%s) values (%s)" % (table, ",".join(self.cols), ",".join(("?",) * len(self.cols))) else: self.sql = "insert into %s values (%%s)" % (table) def execute(self, db, rows, bindings): assert len(rows) > 0, "must have at least one row" if self.cols: sql = self.sql else: sql = self.sql % (",".join(("?",) * len(rows[0]))) bcp = self.db.bulkcopy("dbexts_test", "two", include=['a'], executor=_executor) done = bcp.transfer(self.db) assert done == 3, "expecting three rows to be handled but not inserted, found [%d]" % (done) bcp = self.db.bulkcopy("dbexts_test", "two", include=['a']) done = bcp.transfer(self.db) assert done == 3, "expecting three rows to be inserted, found [%d]" % (done) bcp = self.db.bulkcopy("dbexts_test", "two", include=['a']) bcp.rowxfer([200]) bcp.rowxfer([201]) bcp.rowxfer([202]) bcp.rowxfer([203]) done = bcp.batch() assert done == 4, "expecting four rows to be inserted, found [%d]" % (done) bcp.rowxfer([300]) bcp.rowxfer([401]) bcp.rowxfer([502]) bcp.rowxfer([603]) done = bcp.batch() assert done == 4, "expecting four rows to be inserted, found [%d]" % (done) bcp.rowxfer([205]) bcp.rowxfer([210]) done = bcp.done() assert done == 2, "expecting two rows to be inserted, found [%d]" % (done) assert bcp.total == 10, "expecting 10 rows to be inserted, found [%d]" % (bcp.total) bcp = self.db.bulkcopy("dbexts_test", "two", include=['a']) done = bcp.transfer(self.db) assert done == 16, "expecting sixteen rows to be inserted, found [%d]" % (done) def testTable(self): """testing dbexts.table(tabname)""" self.db.table("one") assert self.db.results is not None, "results were None" self.assertEquals(3, len(self.db.results)) self.db.table() found = 0 for a in self.db.results: if a[2].lower() in ("one", "two"): found += 1 self.assertEquals(2, found) def testOut(self): """testing dbexts.out""" self.db.verbose = 1 fp = open(tempfile.mktemp(), "w") try: self.db.out = fp self.db.raw("insert into one (a, b) values (?, ?)", [(1, 2), (3, 4)]) self.db.isql("select * from one") self.db.verbose = 0 fp.close() fp = open(fp.name, "r") data = fp.read() assert len(data), "expected file to contain output" finally: fp.close() os.remove(fp.name) def testResultSetWrapper(self): """testing result set wrapper""" from dbexts import ResultSet self._insertInto("two", 30) h, r = self.db.raw("select * from two where a in (?, ?, ?, ?) order by a", [(12,15,17,8)]) assert len(r) == 4, "expected [4], got [%d]" % (len(r)) rs = ResultSet(map(lambda x: x[0], h), r) assert len(rs[0]) == 3, "expected [3], got [%d]" % (len(rs[0])) assert rs[0]['a'] == 8, "expected [8], got [%s]" % (rs[0]['a']) assert rs[0]['A'] == 8, "expected [8], got [%s]" % (rs[0]['A']) assert len(rs[0]['b':]) == 2, "expected [2], got [%s]" % (len(rs[0]['b':])) assert len(rs[0]['a':'b']) == 1, "expected [1], got [%s]" % (len(rs[0]['a':'b'])) def testMultipleResultSetConcatentation(self): """testing multiple result sets with some resulting in None""" self._insertInto("two", 30) # first is non None h, r = self.db.raw("select * from two where a = ?", [(12,),(8001,),(15,),(17,),(8,),(9001,)]) assert len(r) == 4, "expected [4], got [%d]" % (len(r)) # first is None h, r = self.db.raw("select * from two where a = ?", [(1200,),(8001,),(15,),(17,),(8,),(9001,)]) assert len(r) == 3, "expected [3], got [%d]" % (len(r)) def testBulkcopyWithDynamicColumns(self): """testing bcp with dynamic column names""" self.testBulkcopy() bcp = self.db.bulkcopy("dbexts_test", "two", exclude=['a']) assert len(bcp.columns) == 2, "expected two columns, found [%d]" % (len(bcp.columns)) a = filter(lambda x, c=bcp.columns: x in c, ['b', 'c']) assert a == ['b', 'c'], "expecting ['b', 'c'], found %s" % (str(a)) bcp = self.db.bulkcopy("dbexts_test", "two") done = bcp.transfer(self.db) assert done == 32, "expecting thirty two rows to be inserted, found [%d]" % (done) def testAutocommit(self): """testing the autocommit functionality""" for u in (0, 1): self.db.autocommit = u try: self.db.isql("select * from one") except Exception, e: fail("failed autocommit query with u=[%d], v=[%d]" % (u, v)) for v in (0, 1): self.db.db.autocommit = v try: self.db.isql("select * from one") except Exception, e: self.fail("failed autocommit query with u=[%d], v=[%d]" % (u, v)) def testPrepare(self): """testing the handling of a prepared statement""" self._insertInto("one", 10) p = self.db.prepare("select * from one") self.db.isql(p) self.db.isql(p) p.close() assert p.closed