Package backend :: Package server :: Package rhnSQL :: Module driver_cx_Oracle
[hide private]
[frames] | no frames]

Source Code for Module backend.server.rhnSQL.driver_cx_Oracle

  1  # 
  2  # Copyright (c) 2008--2017 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU General Public License, 
  5  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
  6  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
  8  # along with this software; if not, see 
  9  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
 10  # 
 11  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 12  # granted to use or replicate Red Hat trademarks that are incorporated 
 13  # in this software or its documentation. 
 14  # 
 15  # 
 16  # Database driver for cx_Oracle 
 17  # 
 18  # As much as possible, keep functionality out of here. These classes should 
 19  # inherit from this in sql_base and generally, just wrap to catch Oracle 
 20  # specific exceptions and return generic ones. (or to deal with other Oracle 
 21  # one-offs) 
 22   
 23  import sql_base 
 24  import sql_types 
 25  import cx_Oracle 
 26  import sys 
 27  import string 
 28  import os 
 29  import re 
 30  from spacewalk.common import usix 
 31   
 32  from rhn.UserDictCase import UserDictCase 
 33  from spacewalk.common.usix import raise_with_tb, next 
 34  from spacewalk.server import rhnSQL 
 35  from spacewalk.common import rhnConfig 
 36  from spacewalk.common.rhnLog import log_debug, log_error 
 37  from spacewalk.common.rhnException import rhnException 
 38  from spacewalk.common.stringutils import to_string 
 39  from const import ORACLE 
 40   
 41  ORACLE_TYPE_MAPPING = [ 
 42      (sql_types.NUMBER, cx_Oracle.NUMBER), 
 43      (sql_types.STRING, cx_Oracle.STRING), 
 44      (sql_types.BINARY, cx_Oracle.BINARY), 
 45      (sql_types.LONG_BINARY, cx_Oracle.LONG_BINARY), 
 46  ] 
 47   
 48   
49 -class Cursor(sql_base.Cursor):
50 51 """ 52 Wrapper that should just transform Oracle specific exceptions into 53 something generic from sql_base. 54 """ 55 OracleError = cx_Oracle.DatabaseError 56
57 - def __init__(self, dbh, sql=None, force=None, blob_map=None):
58 59 try: 60 sql_base.Cursor.__init__(self, dbh=dbh, sql=sql, 61 force=force) 62 self._type_mapping = ORACLE_TYPE_MAPPING 63 self.blob_map = blob_map 64 except sql_base.SQLSchemaError: 65 e = sys.exc_info()[1] 66 (errno, errmsg) = e.errno, e.errmsg 67 if 900 <= errno <= 999: 68 # Per Oracle's documentation, SQL parsing error 69 raise_with_tb(sql_base.SQLStatementPrepareError(self.dbh, errmsg, self.sql), sys.exc_info()[2]) 70 # XXX: we should be handling the lost connection cases 71 # in here too, but we don't get that many of these and 72 # besides, this is much harder to get right 73 74 # XXX: Normally we expect the e.args to include a dump of 75 # the SQL code we just passed in since we're dealing with 76 # an OracleError. I hope this is always the case, of not, 77 # we'll have to log the sql code here 78 raise_with_tb(rhnException("Can not prepare statement", e.args), sys.exc_info()[2])
79
80 - def _prepare(self, force=None):
81 try: 82 return sql_base.Cursor._prepare(self, force) 83 except self.OracleError: 84 e = sys.exc_info()[1] 85 raise_with_tb(self._build_exception(e), sys.exc_info()[2])
86
87 - def _prepare_sql(self):
88 cursor = self.dbh.cursor() 89 90 if self.sql is not None: 91 # Oracle specific extension to the Python DB API: 92 cursor.prepare(self.sql) 93 94 return cursor
95
96 - def _execute_wrapper(self, function, *p, **kw):
97 params = ','.join(["%s: %s" % (repr(key), repr(value)) for key, value 98 in list(kw.items())]) 99 log_debug(5, "Executing SQL: \"%s\" with bind params: {%s}" 100 % (self.sql, params)) 101 if self.sql is None: 102 raise rhnException("Cannot execute empty cursor") 103 if self.blob_map: 104 blob_content = {} 105 for orig_blob_var in list(self.blob_map.keys()): 106 new_blob_var = orig_blob_var + '_blob' 107 blob_content[new_blob_var] = kw[orig_blob_var] 108 kw[new_blob_var] = self.var(cx_Oracle.BLOB) 109 del kw[orig_blob_var] 110 modified_params = self._munge_args(kw) 111 112 try: 113 retval = function(*p, **kw) 114 except self.OracleError: 115 e = sys.exc_info()[1] 116 ret = self._get_oracle_error_info(e) 117 if isinstance(ret, usix.StringType): 118 raise_with_tb(sql_base.SQLError(self.sql, p, kw, ret), sys.exc_info()[2]) 119 (errno, errmsg) = ret[:2] 120 if 900 <= errno <= 999: 121 # Per Oracle's documentation, SQL parsing error 122 raise_with_tb(sql_base.SQLStatementPrepareError(errno, errmsg, self.sql), sys.exc_info()[2]) 123 if errno == 1475: # statement needs to be reparsed; force a prepare again 124 if self.reparsed: # useless, tried that already. give up 125 log_error("Reparsing cursor did not fix it", self.sql) 126 args = ("Reparsing tried and still got this",) + tuple(ret) 127 raise_with_tb(sql_base.SQLError(*args), sys.exc_info()[2]) 128 self._real_cursor = self.dbh.prepare(self.sql) 129 self.reparsed = 1 130 self._execute_wrapper(function, *p, **kw) 131 elif 20000 <= errno <= 20999: # error codes we know we raise as schema errors 132 raise_with_tb(sql_base.SQLSchemaError(*ret), sys.exc_info()[2]) 133 raise_with_tb(sql_base.SQLError(*ret), sys.exc_info()[2]) 134 except ValueError: 135 # this is not good.Let the user know 136 raise 137 else: 138 self.reparsed = 0 # reset the reparsed counter 139 140 if self.blob_map: 141 for blob_var, content in blob_content.items(): 142 kw[blob_var].getvalue().write(content) 143 # Munge back the values 144 self._unmunge_args(kw, modified_params) 145 return retval
146
147 - def _execute_(self, args, kwargs):
148 """ 149 Oracle specific execution of the query. 150 """ 151 # TODO: args appears unused, raise exception if we see any? 152 153 # Only copy the arguments we're interested in 154 _p = UserDictCase(kwargs) 155 params = {} 156 157 # Check that all required parameters were provided: 158 # NOTE: bindnames() is Oracle specific: 159 for k in self._real_cursor.bindnames(): 160 if not _p.has_key(k): 161 # Raise the fault ourselves 162 raise sql_base.SQLError(1008, 'Not all variables bound', k) 163 params[k] = to_string(_p[k]) 164 165 # cx_Oracle expects the first arg to be the statement and no 166 # positional args: 167 try: 168 self._real_cursor.execute(*(None, ), **params) 169 except cx_Oracle.OperationalError: 170 e = sys.exc_info()[1] 171 raise sql_base.SQLError("Cannot execute SQL statement: %s" % str(e)) 172 173 self.description = self._real_cursor.description 174 return self._real_cursor.rowcount
175
176 - def _executemany(self, *args, **kwargs):
177 # cx_Oracle expects the first arg to be the statement 178 if not kwargs: 179 return 0 180 # Compute number of values 181 max_array_size = 25 182 i = iter(list(kwargs.values())) 183 firstval = next(i) 184 array_size = len(firstval) 185 if array_size == 0: 186 return 0 187 188 chunk_size = min(max_array_size, array_size) 189 pdict = {} 190 for k in kwargs.iterkeys(): 191 pdict[k] = None 192 arr = [] 193 for i in range(chunk_size): 194 arr.append(pdict.copy()) 195 196 # Now arr is an array of the desired size 197 rowcount = 0 198 start = 0 199 while start < array_size: 200 item_count = min(array_size - start, chunk_size) 201 # Trim the array if it is too big 202 if item_count != chunk_size: 203 arr = arr[:item_count] 204 205 for i in range(item_count): 206 pdict = arr[i] 207 for k, v in kwargs.items(): 208 pdict[k] = to_string(v[start + i]) 209 210 # We clear self->bindVariables so that list of all nulls 211 # in the previous chunk which caused the type to be set to 212 # string does not affect our chunk which may have number 213 # there. 214 self._real_cursor.setinputsizes(**{}) 215 216 # arr is now a list of dictionaries. Each dictionary contains the 217 # data for one execution of the query where the key is the column 218 # name and the value self explanatory. 219 self._real_cursor.executemany(None, arr) 220 self.description = self._real_cursor.description 221 222 rowcount = rowcount + self._real_cursor.rowcount 223 start = start + chunk_size 224 225 return rowcount
226
227 - def _get_oracle_error_info(self, error):
228 if isinstance(error, cx_Oracle.DatabaseError): 229 e = error[0] 230 if isinstance(e, cx_Oracle._Error): 231 return (e.code, e.message, self.sql) 232 return (None, str(error), self.sql)
233 234 # so we can "inherit" the self._real_cursor functions
235 - def __getattr__(self, name):
236 if hasattr(self._real_cursor, name): 237 return getattr(self._real_cursor, name) 238 raise AttributeError(name)
239 240 # deletion of the object
241 - def __del__(self):
242 self.reparsed = 0 243 self.dbh = self.sql = self._real_cursor = None
244
245 - def _build_exception(self, error):
246 ret = self._get_oracle_error_info(error) 247 if isinstance(ret, usix.StringType): 248 return sql_base.SQLError(ret) 249 return sql_base.SQLSchemaError(ret[0], ret[1])
250
251 - def _munge_arg(self, val):
252 for sqltype, dbtype in self._type_mapping: 253 if isinstance(val, sqltype): 254 var = self._real_cursor.var(dbtype, val.size) 255 var.setvalue(0, val.get_value()) 256 return var 257 258 # TODO: Find out why somebody flagged this with XXX? 259 # XXX 260 return val.get_value()
261
262 - def _unmunge_args(self, kw_dict, modified_params):
263 for k, v in modified_params: 264 v.set_value(kw_dict[k].getvalue())
265 266 # TODO: Don't think this is doing anything for PostgreSQL, maybe move to Oracle?
267 - def _munge_args(self, kw_dict):
268 modified = [] 269 for k, v in kw_dict.items(): 270 if not isinstance(v, sql_types.DatabaseDataType): 271 continue 272 vv = self._munge_arg(v) 273 modified.append((k, v)) 274 kw_dict[k] = vv 275 return modified
276
277 - def update_blob(self, table_name, column_name, where_clause, data, 278 **kwargs):
279 sql = "SELECT %s FROM %s %s FOR update of %s" % \ 280 (column_name, table_name, where_clause, column_name) 281 c = rhnSQL.prepare(sql) 282 c.execute(**kwargs) 283 row = c.fetchone_dict() 284 blob = row[column_name] 285 blob.write(data)
286 287
288 -class Procedure(sql_base.Procedure):
289 OracleError = cx_Oracle.DatabaseError 290
291 - def __init__(self, name, cursor):
292 sql_base.Procedure.__init__(self, name, cursor) 293 self._type_mapping = ORACLE_TYPE_MAPPING
294
295 - def __call__(self, *args):
296 """ 297 Wrap the __call__ method from the parent class to catch Oracle specific 298 actions and convert them to something generic. 299 """ 300 log_debug(2, self.name, args) 301 retval = None 302 try: 303 retval = self._call_proc(args) 304 except cx_Oracle.DatabaseError: 305 e = sys.exc_info()[1] 306 if not hasattr(e, "args"): 307 raise_with_tb(sql_base.SQLError(self.name, args), sys.exc_info()[2]) 308 elif 20000 <= e[0].code <= 20999: # error codes we know we raise as schema errors 309 310 raise_with_tb(sql_base.SQLSchemaError(e[0].code, str(e[0])), sys.exc_info()[2]) 311 312 raise_with_tb(sql_base.SQLError(e[0].code, str(e[0])), sys.exc_info()[2]) 313 except cx_Oracle.NotSupportedError: 314 error = sys.exc_info()[1] 315 raise_with_tb(sql_base.SQLError(*error.args), sys.exc_info()[2]) 316 return retval
317
318 - def _munge_args(self, args):
319 """ 320 Converts database specific argument types to those defined in sql_base. 321 """ 322 new_args = [] 323 for arg in args: 324 if not isinstance(arg, sql_types.DatabaseDataType): 325 new_args.append(arg) 326 continue 327 new_args.append(self._munge_arg(arg)) 328 return new_args
329
330 - def _munge_arg(self, val):
331 for sqltype, db_specific_type in self._type_mapping: 332 if isinstance(val, sqltype): 333 var = self.cursor.var(db_specific_type, val.size) 334 var.setvalue(0, val.get_value()) 335 return var 336 337 # XXX 338 return val.get_value()
339
340 - def _call_proc(self, args):
341 return self._call_proc_ret(args, ret_type=None)
342
343 - def _call_proc_ret(self, args, ret_type=None):
344 args = list(map(to_string, self._munge_args(args))) 345 if ret_type: 346 ret_type_mapped = False 347 for sqltype, db_type in self._type_mapping: 348 if isinstance(ret_type, sqltype): 349 ret_type = db_type 350 ret_type_mapped = True 351 break 352 if not ret_type_mapped: 353 raise Exception("Unknown type", ret_type) 354 355 if ret_type: 356 return self.cursor.callfunc(self.name, ret_type, args) 357 else: 358 return self.cursor.callproc(self.name, args)
359 360
361 -class Function(Procedure):
362
363 - def __init__(self, name, proc, ret_type):
364 Procedure.__init__(self, name, proc) 365 self.ret_type = ret_type
366
367 - def _call_proc(self, args):
368 return self._call_proc_ret(args, self.ret_type)
369 370
371 -class Database(sql_base.Database):
372 _cursor_class = Cursor 373 _procedure_class = Procedure 374 TimestampFromTicks = cx_Oracle.TimestampFromTicks 375 OracleError = cx_Oracle.DatabaseError 376
377 - def __init__(self, host=None, port=None, username=None, 378 password=None, database=None, sslmode=None, sslrootcert=None):
379 380 # Oracle requires enough info to connect 381 if not (username and password and database): 382 raise AttributeError("A valid Oracle username, password, and SID are required.") 383 if sslmode is not None: 384 raise AttributeError("Option sslmode is not supported for Oracle database backend.") 385 if sslrootcert is not None: 386 raise AttributeError("Option sslrootcert is not supported for Oracle database backend.") 387 388 sql_base.Database.__init__(self) 389 390 self.username = username 391 self.password = password 392 self.database = database 393 394 # dbtxt is the connection string without the password, to be used in exceptions 395 self.dbtxt = self.username + '@' + self.database 396 397 self.dbh = None 398 399 # self.stderr keeps the sys.stderr handle alive in case it gets 400 # collected too early. 401 self.stderr = sys.stderr
402
403 - def connect(self, reconnect=1):
404 log_debug(1, "Connecting to database", self.dbtxt) 405 self._fix_environment_vars() 406 try: 407 self.dbh = self._connect() 408 except self.OracleError: 409 e = sys.exc_info()[1] 410 ret = self._get_oracle_error_info(e) 411 if isinstance(ret, usix.StringType): 412 raise_with_tb(sql_base.SQLConnectError(self.dbtxt, -1, 413 "Unable to connect to database", ret), sys.exc_info()[2]) 414 (errno, errmsg) = ret[:2] 415 log_error("Connection attempt failed", errno, errmsg) 416 if reconnect: 417 # we don't try to reconnect blindly. We have a list of 418 # known "good" failure codes that warrant a reconnect 419 # attempt 420 if errno in [12547]: # lost contact 421 return self.connect(reconnect=0) 422 err_args = [self.dbtxt, errno, errmsg] 423 err_args.extend(list(ret[2:])) 424 raise_with_tb(sql_base.SQLConnectError(*err_args), sys.exc_info()[2]) 425 # else, this is a reconnect attempt 426 raise_with_tb(sql_base.SQLConnectError(*( 427 [self.dbtxt, errno, errmsg, 428 "Attempting Re-Connect to the database failed", ] + ret[2:])), sys.exc_info()[2]) 429 dbh_id = id(self.dbh) 430 # Reset the statement cache for this database connection 431 self._cursor_class._cursor_cache[dbh_id] = {}
432
433 - def _connect(self):
434 dbh = cx_Oracle.Connection(self.username, self.password, self.database) 435 if hasattr(sys, "argv"): 436 dbh.cursor().execute( 437 "BEGIN DBMS_APPLICATION_INFO.SET_MODULE('%s',NULL); END;" 438 % sys.argv[0]) 439 return dbh
440
441 - def is_connected_to(self, backend, host, port, username, password, 442 database, sslmode, sslrootcert):
443 # NOTE: host and port are unused for Oracle: 444 return (backend == ORACLE) and (self.username == username) and \ 445 (self.password == password) and (self.database == database) and \ 446 (sslmode is None) and (sslrootcert is None)
447 448 # try to close it first nicely
449 - def close(self):
450 if self.dbh is not None: 451 try: 452 self.dbh.close() 453 except: 454 pass 455 log_debug(1, "Closed DB database connection to %s" % self.dbtxt) 456 dbh_id = id(self.dbh) 457 _cursor_cache = self._cursor_class._cursor_cache 458 if dbh_id in _cursor_cache: 459 _cache = _cursor_cache[dbh_id] 460 for sql, cursor in _cache.items(): 461 # Close cursors 462 try: 463 cursor.close() 464 except: 465 pass 466 del _cursor_cache[dbh_id] 467 self.dbh = None
468
469 - def cursor(self):
470 return self._cursor_class(dbh=self.dbh)
471 472 # pass-through functions for when you want to do SQL yourself
473 - def prepare(self, sql, force=0, blob_map=None):
474 # Abuse the map calls to get rid of SQL comments and extra spaces 475 sql = string.join([a for a in list(map(string.strip, 476 [(a + " ")[:string.find(a, '--')] for a in string.split(sql, "\n")])) if len(a)], 477 " ") 478 if blob_map: 479 col_list = [] 480 bind_list = [] 481 for bind_var, column in blob_map.items(): 482 r = re.compile(":%s" % bind_var) 483 sql = re.sub(r, 'empty_blob()', sql) 484 col_list.append(column) 485 bind_list.append(":%s_blob" % bind_var) 486 sql += " returning %s into %s" % (','.join(col_list), ','.join(bind_list)) 487 # this way we only hit the network once for each sql statement 488 return self._cursor_class(dbh=self.dbh, sql=sql, force=force, blob_map=blob_map)
489
490 - def procedure(self, name):
491 try: 492 c = self.dbh.cursor() 493 except cx_Oracle.DatabaseError: 494 error = sys.exc_info()[1] 495 e = error[0] 496 raise_with_tb(sql_base.SQLSchemaError(e.code, e.message, e.context), sys.exc_info()[2]) 497 # Pass the cursor in so we can close it after execute() 498 return self._procedure_class(name, c)
499
500 - def _function(self, name, ret_type):
501 try: 502 c = self.dbh.cursor() 503 except cx_Oracle.DatabaseError: 504 error = sys.exc_info()[1] 505 e = error[0] 506 raise_with_tb(sql_base.SQLSchemaError(e.code, e.message, e.context), sys.exc_info()[2]) 507 return Function(name, c, ret_type)
508 509 # why would anybody need this?!
510 - def execute(self, sql, *args, **kwargs):
511 cursor = self.prepare(sql) 512 cursor.execute(*args, **kwargs) 513 return cursor
514 515 # transaction support
516 - def transaction(self, name):
517 if not name: 518 raise rhnException("Can not set a transaction without a name", name) 519 return self.execute("savepoint %s" % name)
520
521 - def commit(self):
522 log_debug(3, self.dbtxt) 523 if self.dbh is not None: 524 return self.dbh.commit()
525
526 - def rollback(self, name=None):
527 log_debug(3, self.dbtxt, name) 528 if name: # we need to roll back to a savepoint 529 return self.execute("rollback to savepoint %s" % name) 530 return self.dbh.rollback()
531
532 - def check_connection(self):
533 try: 534 h = self.prepare("select 1 from dual") 535 h.execute() 536 except: # try to reconnect, that one MUST WORK always 537 log_error("DATABASE CONNECTION TO '%s' LOST" % self.dbtxt, 538 "Exception information: %s" % sys.exc_info()[1]) 539 self.connect() # only allow one try 540 return 0
541 542 # function that attempts to fix the environment variables
543 - def _fix_environment_vars(self):
544 # Bugzilla 150452 On RHEL 4, for some reason, mod_perl tries to free 545 # an invalid pointer if we set an environment variable. 546 # If the environment variables are already set, this will be a noop 547 548 if "NLS_LANG" not in os.environ: 549 value = None 550 # Do we have a config object? 551 if rhnConfig.CFG.is_initialized(): 552 if rhnConfig.CFG.has_key("nls_lang"): 553 # Get the value from the configuration object 554 value = rhnConfig.CFG.nls_lang 555 if not value: 556 # Assign a default value 557 value = "english.UTF8" 558 os.environ["NLS_LANG"] = value
559 560 # Should return a sequence [code, message, ...] or an error message if no 561 # code is to be found
562 - def _get_oracle_error_info(self, error):
563 if isinstance(error, cx_Oracle.DatabaseError): 564 e = error[0] 565 return (e.code, e.message, e.context) 566 return str(error)
567
568 - def _read_lob(self, lob):
569 if not lob: 570 return None 571 return lob.read()
572
573 - def Date(self, year, month, day):
574 return cx_Oracle.Date(year, month, day)
575
576 - def DatetimeFromTicks(self, ticks):
577 return cx_Oracle.DatetimeFromTicks(ticks)
578