我们从Python开源项目中,提取了以下20个代码示例,用于说明如何使用pymysql.ProgrammingError()。
def process_item(self, item, spider): if spider.name == 'RssCrawler': # Search the CurrentVersion table for a version of the article try: self.cursor.execute(self.compare_versions, (item['url'],)) except (pymysql.err.OperationalError, pymysql.ProgrammingError, pymysql.InternalError, pymysql.IntegrityError, TypeError) as error: self.log.error("Something went wrong in rss query: %s", error) # Save the result of the query. Must be done before the add, # otherwise the result will be overwritten in the buffer old_version = self.cursor.fetchone() if old_version is not None: # Compare the two download dates. index 3 of old_version # corresponds to the download_date attribute in the DB if (datetime.datetime.strptime( item['download_date'], "%y-%m-%d %H:%M:%S") - old_version[3]) \ < datetime.timedelta(hours=self.delta_time): raise DropItem("Article in DB too recent. Not saving.") return item
def setUp(self): _skip_if_no_pymysql() import pymysql try: # Try Travis defaults. # No real user should allow root access with a blank password. self.conn = pymysql.connect(host='localhost', user='root', passwd='', db='pandas_nosetest') except: pass else: return try: self.conn = pymysql.connect(read_default_group='pandas') except pymysql.ProgrammingError: raise nose.SkipTest( "Create a group of connection parameters under the heading " "[pandas] in your system's mysql default file, " "typically located at ~/.my.cnf or /etc/.my.cnf. ") except pymysql.Error: raise nose.SkipTest( "Cannot connect to database. " "Create a group of connection parameters under the heading " "[pandas] in your system's mysql default file, " "typically located at ~/.my.cnf or /etc/.my.cnf. ")
def has_database(self, db_name): """ Check if the database 'db_name' exists on the current connection. Parameters ---------- db_name : str The name of the database Returns ------- b : bool True if the database exists, or False otherwise. """ if self.db_type == SQL_MYSQL: with self.engine.connect() as connection: results = connection.execute("SHOW DATABASES") try: for x in results: if x[0] == db_name.split()[0]: return db_name except pymysql.ProgrammingError as ex: warnings.warn(ex) raise ex return False elif self.db_type == SQL_SQLITE: return os.path.exists(SqlDB.sqlite_path(db_name))
def tableExists(connection, tablename): try: if not tableIsEmpty(connection, tablename): return True except pymysql.ProgrammingError as e: return False
def table_exists(connection, tablename): try: if get_number_of_rows(connection, tablename) > 0: return True except pymysql.ProgrammingError: return False
def __get_top_for_tables(self, tables, top=30): tables_information = {} cursor = self.__conn.cursor() for table in tables: tables_information[table] = {'rows': []} if top > 0: try: self.__logger.debug('Getting {top} rows for table {table}'.format(top=top, table=table)) cursor.execute('SELECT * FROM {schema}.{table} LIMIT {top}'.format(top=top, table=table, schema=self.__db_name)) for row in cursor.fetchall(): table_row = [] for column in row: try: if type(column) is unicode: column = unicodedata.normalize('NFKD', column).encode('iso-8859-1', 'replace') else: column = str(column).decode('utf8', 'replace').encode('iso-8859-1', 'replace') if self.__illegal_characters.search(column): column = re.sub(self.__illegal_characters, '?', column) if column == 'None': column = 'NULL' except: column = 'Parse_error' table_row.append(column) tables_information[table]['rows'].append(table_row) except pymysql.ProgrammingError: tables_information[table]['rows'].append( 'Error getting table data {error}'.format(error=pymysql.ProgrammingError.message)) return tables_information
def fetchall(self): try: return self.cursor.fetchall() except pymysql.ProgrammingError, msg: logger.log(logging.WARN if conf.dbmsHandler else logging.DEBUG, "(remote) %s" % msg[1]) return None
def execute(self, query): retVal = False try: self.cursor.execute(query) retVal = True except (pymysql.OperationalError, pymysql.ProgrammingError), msg: logger.log(logging.WARN if conf.dbmsHandler else logging.DEBUG, "(remote) %s" % msg[1]) except pymysql.InternalError, msg: raise SqlmapConnectionException(msg[1]) self.connector.commit() return retVal
def setUpClass(cls): _skip_if_no_pymysql() # test connection import pymysql try: # Try Travis defaults. # No real user should allow root access with a blank password. pymysql.connect(host='localhost', user='root', passwd='', db='pandas_nosetest') except: pass else: return try: pymysql.connect(read_default_group='pandas') except pymysql.ProgrammingError: raise nose.SkipTest( "Create a group of connection parameters under the heading " "[pandas] in your system's mysql default file, " "typically located at ~/.my.cnf or /etc/.my.cnf. ") except pymysql.Error: raise nose.SkipTest( "Cannot connect to database. " "Create a group of connection parameters under the heading " "[pandas] in your system's mysql default file, " "typically located at ~/.my.cnf or /etc/.my.cnf. ")
def retire_in_db(self, dbconn) : retire_transaction_string = "UPDATE trked_trans set active = 0 where id = " + self.dbid try: dbconn.execute(retire_transaction_string) return_dict["success"] = True except pymysql.IntegrityError as e : return_dict["success"] = False return_dict["failure_message"] = "Integrity Error" return_dict["debug"] = str(e) except pymysql.ProgrammingError as e : return_dict["success"] = False return_dict["failure_message"] = "ProgrammingError" return_dict["debug"] = str(e) except pymysql.DataError as e : return_dict["success"] = False return_dict["failure_message"] = "DataError" return_dict["debug"] = str(e) except pymysql.NotSupportedError as e : return_dict["success"] = False return_dict["failure_message"] = "NotSupportedError" return_dict["debug"] = str(e) except pymysql.OperationalError as e : return_dict["success"] = False return_dict["failure_message"] = "OperationalError" return_dict["debug"] = str(e) except Exception as e : return_dict["success"] = False return_dict["failure_short"] = "Unknown Failure " + str(e) return return_dict
def do_update(self, dbconn, updatedict): return_dict = dict() do_update_sql = "INSERT into attempts (fk_trked_trans_id, result) VALUES (%s, %s) ; " +\ "UPDATE trked_trans set lastChecked = CURRENT_TIMESTAMP, active = %s where id = %s ; " values_to_insert = (self.dbid, updatedict["result"], updatedict["still_valid"], self.dbid ) try: dbconn.execute(do_update_sql, values_to_insert) return_dict["insert_id"] = dbconn.lastrowid self.dbid = int(return_dict["insert_id"]) return_dict["success"] = True except pymysql.IntegrityError as e : return_dict["success"] = False return_dict["failure_message"] = "Integrity Error" return_dict["debug"] = str(e) except pymysql.ProgrammingError as e : return_dict["success"] = False return_dict["failure_message"] = "ProgrammingError" return_dict["debug"] = str(e) except pymysql.DataError as e : return_dict["success"] = False return_dict["failure_message"] = "DataError" return_dict["debug"] = str(e) except pymysql.NotSupportedError as e : return_dict["success"] = False return_dict["failure_message"] = "NotSupportedError" return_dict["debug"] = str(e) except pymysql.OperationalError as e : return_dict["success"] = False return_dict["failure_message"] = "OperationalError" return_dict["debug"] = str(e) except Exception as e : return_dict["success"] = False return_dict["failure_short"] = "Unknown Failure " + str(e) return return_dict
def add_to_database(self, dbconn): # Add to Database return_dict = dict() delete_string = str(uuid.uuid4()) values_to_insert = (self.txid, True, self.txhex, delete_string) add_transaction_string = "INSERT into trked_trans (txid, active, hextx, deletestring) VALUES( %s, %s, %s, %s ) ;" try: dbconn.execute(add_transaction_string, values_to_insert) return_dict["insert_id"] = dbconn.lastrowid self.dbid = int(return_dict["insert_id"]) return_dict["success"] = True return_dict["delete_string"] = delete_string except pymysql.IntegrityError as e : return_dict["success"] = False return_dict["failure_message"] = "Integrity Error (Transaction probably already in system)" return_dict["debug"] = str(e) except pymysql.ProgrammingError as e : return_dict["success"] = False return_dict["failure_message"] = "ProgrammingError" return_dict["debug"] = str(e) except pymysql.DataError as e : return_dict["success"] = False return_dict["failure_message"] = "DataError" return_dict["debug"] = str(e) except pymysql.NotSupportedError as e : return_dict["success"] = False return_dict["failure_message"] = "NotSupportedError" return_dict["debug"] = str(e) except pymysql.OperationalError as e : return_dict["success"] = False return_dict["failure_message"] = "OperationalError" return_dict["debug"] = str(e) except Exception as e : return_dict["success"] = False return_dict["failure_short"] = "Unknown Failure " + str(e) return return_dict
def reset_mysql(self): """ Resets the MySQL database. """ confirm = self.no_confirm print(""" Cleanup MySQL database: This will truncate all tables and reset the whole database. """) if not confirm: confirm = 'yes' in builtins.input( """ Do you really want to do this? Write 'yes' to confirm: {yes}""" .format(yes='yes' if confirm else '')) if not confirm: print("Did not type yes. Thus aborting.") return print("Resetting database...") try: # initialize DB connection self.conn = pymysql.connect(host=self.mysql["host"], port=self.mysql["port"], db=self.mysql["db"], user=self.mysql["username"], passwd=self.mysql["password"]) self.cursor = self.conn.cursor() self.cursor.execute("TRUNCATE TABLE CurrentVersions") self.cursor.execute("TRUNCATE TABLE ArchiveVersions") self.conn.close() except (pymysql.err.OperationalError, pymysql.ProgrammingError, pymysql.InternalError, pymysql.IntegrityError, TypeError) as error: self.log.error("Database reset error: %s", error)
def explain(self, S): """ Explain a MySQL query. The output of the EXPLAIN command is formatted as a table, and then logged to the logger as an INFO. Parameters ---------- S : string The MySQL string to be explained. """ command = S.partition(" ")[0].upper() if command in ["SHOW", "DESCRIBE", "SET", "RESET"]: return try: explain_table = self.connection.execute("EXPLAIN %s" % S) except pymysql.ProgrammingError as e: raise SQLProgrammingError(S + "\n"+ "%s" % e) else: explain_table_rows = [[x[0] for x in explain_table.description]] for x in explain_table: explain_table_rows.append([str(y) for y in x]) explain_column_width = [len(x[0]) for x in explain_table.description] for current_row in explain_table_rows: for i, x in enumerate(current_row): explain_column_width[i] = max(explain_column_width[i], len(x)) format_string = " | ".join(["%%-%is" % x for x in explain_column_width]) line_string = "-" * (sum(explain_column_width) - 3 + 3 * len(explain_column_width)) log_rows = ["EXPLAIN %s" % S] log_rows.append(line_string) log_rows.append(format_string % tuple(explain_table_rows[0])) log_rows.append(line_string) for x in explain_table_rows[1:]: log_rows.append(format_string % tuple(x)) log_rows.append(line_string) logger.debug("\n".join(log_rows))