我们从Python开源项目中,提取了以下7个代码示例,用于说明如何使用pymysql.IntegrityError()。
def process_item(self, item, spider): if spider.name == 'RssCrawler': # Search the CurrentVersion table for a version of the article try: self.cursor.execute(self.compare_versions, (item['url'],)) except (pymysql.err.OperationalError, pymysql.ProgrammingError, pymysql.InternalError, pymysql.IntegrityError, TypeError) as error: self.log.error("Something went wrong in rss query: %s", error) # Save the result of the query. Must be done before the add, # otherwise the result will be overwritten in the buffer old_version = self.cursor.fetchone() if old_version is not None: # Compare the two download dates. index 3 of old_version # corresponds to the download_date attribute in the DB if (datetime.datetime.strptime( item['download_date'], "%y-%m-%d %H:%M:%S") - old_version[3]) \ < datetime.timedelta(hours=self.delta_time): raise DropItem("Article in DB too recent. Not saving.") return item
def retire_in_db(self, dbconn) : retire_transaction_string = "UPDATE trked_trans set active = 0 where id = " + self.dbid try: dbconn.execute(retire_transaction_string) return_dict["success"] = True except pymysql.IntegrityError as e : return_dict["success"] = False return_dict["failure_message"] = "Integrity Error" return_dict["debug"] = str(e) except pymysql.ProgrammingError as e : return_dict["success"] = False return_dict["failure_message"] = "ProgrammingError" return_dict["debug"] = str(e) except pymysql.DataError as e : return_dict["success"] = False return_dict["failure_message"] = "DataError" return_dict["debug"] = str(e) except pymysql.NotSupportedError as e : return_dict["success"] = False return_dict["failure_message"] = "NotSupportedError" return_dict["debug"] = str(e) except pymysql.OperationalError as e : return_dict["success"] = False return_dict["failure_message"] = "OperationalError" return_dict["debug"] = str(e) except Exception as e : return_dict["success"] = False return_dict["failure_short"] = "Unknown Failure " + str(e) return return_dict
def do_update(self, dbconn, updatedict): return_dict = dict() do_update_sql = "INSERT into attempts (fk_trked_trans_id, result) VALUES (%s, %s) ; " +\ "UPDATE trked_trans set lastChecked = CURRENT_TIMESTAMP, active = %s where id = %s ; " values_to_insert = (self.dbid, updatedict["result"], updatedict["still_valid"], self.dbid ) try: dbconn.execute(do_update_sql, values_to_insert) return_dict["insert_id"] = dbconn.lastrowid self.dbid = int(return_dict["insert_id"]) return_dict["success"] = True except pymysql.IntegrityError as e : return_dict["success"] = False return_dict["failure_message"] = "Integrity Error" return_dict["debug"] = str(e) except pymysql.ProgrammingError as e : return_dict["success"] = False return_dict["failure_message"] = "ProgrammingError" return_dict["debug"] = str(e) except pymysql.DataError as e : return_dict["success"] = False return_dict["failure_message"] = "DataError" return_dict["debug"] = str(e) except pymysql.NotSupportedError as e : return_dict["success"] = False return_dict["failure_message"] = "NotSupportedError" return_dict["debug"] = str(e) except pymysql.OperationalError as e : return_dict["success"] = False return_dict["failure_message"] = "OperationalError" return_dict["debug"] = str(e) except Exception as e : return_dict["success"] = False return_dict["failure_short"] = "Unknown Failure " + str(e) return return_dict
def add_to_database(self, dbconn): # Add to Database return_dict = dict() delete_string = str(uuid.uuid4()) values_to_insert = (self.txid, True, self.txhex, delete_string) add_transaction_string = "INSERT into trked_trans (txid, active, hextx, deletestring) VALUES( %s, %s, %s, %s ) ;" try: dbconn.execute(add_transaction_string, values_to_insert) return_dict["insert_id"] = dbconn.lastrowid self.dbid = int(return_dict["insert_id"]) return_dict["success"] = True return_dict["delete_string"] = delete_string except pymysql.IntegrityError as e : return_dict["success"] = False return_dict["failure_message"] = "Integrity Error (Transaction probably already in system)" return_dict["debug"] = str(e) except pymysql.ProgrammingError as e : return_dict["success"] = False return_dict["failure_message"] = "ProgrammingError" return_dict["debug"] = str(e) except pymysql.DataError as e : return_dict["success"] = False return_dict["failure_message"] = "DataError" return_dict["debug"] = str(e) except pymysql.NotSupportedError as e : return_dict["success"] = False return_dict["failure_message"] = "NotSupportedError" return_dict["debug"] = str(e) except pymysql.OperationalError as e : return_dict["success"] = False return_dict["failure_message"] = "OperationalError" return_dict["debug"] = str(e) except Exception as e : return_dict["success"] = False return_dict["failure_short"] = "Unknown Failure " + str(e) return return_dict
def update_message_sent_status(message, status): message_id = message.get('message_id') if not message_id: return mode = message.get('mode') if not mode: return # Don't track this for twilio as those are kept track of separately. Make # use of this for email, and, as a side effect of that for slack if mode in ('sms', 'call'): return session = db.Session() try: session.execute('''INSERT INTO `generic_message_sent_status` (`message_id`, `status`) VALUES (:message_id, :status) ON DUPLICATE KEY UPDATE `status` = :status''', {'message_id': message_id, 'status': status}) session.commit() except (DataError, IntegrityError, InternalError): logger.exception('Failed setting message sent status for message %s', message) finally: session.close()
def reset_mysql(self): """ Resets the MySQL database. """ confirm = self.no_confirm print(""" Cleanup MySQL database: This will truncate all tables and reset the whole database. """) if not confirm: confirm = 'yes' in builtins.input( """ Do you really want to do this? Write 'yes' to confirm: {yes}""" .format(yes='yes' if confirm else '')) if not confirm: print("Did not type yes. Thus aborting.") return print("Resetting database...") try: # initialize DB connection self.conn = pymysql.connect(host=self.mysql["host"], port=self.mysql["port"], db=self.mysql["db"], user=self.mysql["username"], passwd=self.mysql["password"]) self.cursor = self.conn.cursor() self.cursor.execute("TRUNCATE TABLE CurrentVersions") self.cursor.execute("TRUNCATE TABLE ArchiveVersions") self.conn.close() except (pymysql.err.OperationalError, pymysql.ProgrammingError, pymysql.InternalError, pymysql.IntegrityError, TypeError) as error: self.log.error("Database reset error: %s", error)
def idx_agent(self): """Insert new agent into database if necessary. Args: None Returns: idx_agent: IDX value of agent from database """ # Initialize key variables agent_name = self.agent_data['agent_name'] id_agent = self.agent_data['id_agent'] # Get information on agent from database agent_data = db_agent.GetIDAgent(id_agent) # Return if agent already exists in the table if agent_data.exists() is True: idx_agent = agent_data.idx_agent() return idx_agent # Get information on agent from database name_data = db_agentname.GetAgentName(agent_name) # Insert data into table if required # Get idx_agentname if name_data.exists() is False: record = AgentName( name=general.encode(agent_name)) database = db.Database() try: database.add(record, 1145) except pymysql.IntegrityError: # There may be a duplicate agent name if this is a brand # new database and there is a flurry of updates from multiple # agents. This is OK, pass. # # We are expecting a 'pymysql.err.IntegrityError' but for some # reason it could not be caught. pass new_name_data = db_agentname.GetAgentName(agent_name) idx_agentname = new_name_data.idx_agentname() else: idx_agentname = name_data.idx_agentname() # Add record to the database new_record = Agent( id_agent=general.encode(id_agent), idx_agentname=idx_agentname) database = db.Database() database.add(new_record, 1081) # Get idx_agent value from database new_agent_data = db_agent.GetIDAgent(id_agent) idx_agent = new_agent_data.idx_agent() return idx_agent