我们从Python开源项目中,提取了以下23个代码示例,用于说明如何使用pymysql.OperationalError()。
def ExecProcQuery(self, sql): try: if self.connect() is None: self.conn = None return None cur = self.conn.cursor() cur.execute(sql) resList = cur.fetchall() self.conn.commit() except pymysql.OperationalError as e: if self.conn is not None: self.conn.close() log.e('ExecProcQuery[%s,%s]\n' % (sql, str(e))) return None except Exception as e: if self.conn is not None: self.conn.close() log.e('ExecProcQuery[%s,%s]\n' % (sql, str(e))) return None return resList
def ExecProcNonQuery(self, sql): try: if self.connect() is None: self.conn = None return False cur = self.conn.cursor() cur.execute(sql) self.conn.commit() return True except pymysql.OperationalError as e: # self.conn.close() if self.conn is not None: self.conn.close() log.e('ExecProcNonQuery[%s,%s]\n' % (sql, str(e))) return False except Exception as e: if self.conn is not None: self.conn.close() log.e('ExecProcNonQuery[%s,%s]\n' % (sql, str(e))) return False
def ExecNonQuery(mysql, sql): try: if mysql.connect() is None: mysql.conn = None return False cur = mysql.conn.cursor() cur.execute(sql) # self.conn.commit() return True except pymysql.OperationalError as e: # self.conn.close() if mysql.conn is not None: mysql.conn.close() log.e('ExecProcNonQuery[%s,%s]\n' % (sql, str(e))) return False except Exception as e: if mysql.conn is not None: mysql.conn.close() log.e('ExecProcNonQuery[%s,%s]\n' % (sql, str(e))) return False
def find_synced_nodes(self): """Find a node in the cluster in SYNCED state. :return: List of Galera node in SYNCED state. :rtype: list(GaleraNode) :raise: GaleraClusterSyncedNodeNotFound """ LOG.debug('Looking for a SYNCED node') nodes = [] for galera_node in self._nodes: try: state = galera_node.wsrep_local_state LOG.debug('%s state: %s', galera_node, state) if state == GaleraNodeState.SYNCED: nodes.append(galera_node) except OperationalError as err: LOG.error(err) LOG.info('Skipping node %s', galera_node) if nodes: return nodes else: raise GaleraClusterSyncedNodeNotFound('Cluster has ' 'no SYNCED nodes')
def realTestPamAuth(self): db = self.db.copy() import os db['password'] = os.environ.get('PASSWORD') cur = self.connections[0].cursor() try: cur.execute('show grants for ' + TestAuthentication.osuser + '@localhost') grants = cur.fetchone()[0] cur.execute('drop user ' + TestAuthentication.osuser + '@localhost') except pymysql.OperationalError as e: # assuming the user doesn't exist which is ok too self.assertEqual(1045, e.args[0]) grants = None with TempUser(cur, TestAuthentication.osuser + '@localhost', self.databases[0]['db'], 'pam', os.environ.get('PAMSERVICE')) as u: try: c = pymysql.connect(user=TestAuthentication.osuser, **db) db['password'] = 'very bad guess at password' with self.assertRaises(pymysql.err.OperationalError): pymysql.connect(user=TestAuthentication.osuser, auth_plugin_map={b'mysql_cleartext_password': TestAuthentication.DefectiveHandler}, **self.db) except pymysql.OperationalError as e: self.assertEqual(1045, e.args[0]) # we had 'bad guess at password' work with pam. Well at least we get a permission denied here with self.assertRaises(pymysql.err.OperationalError): pymysql.connect(user=TestAuthentication.osuser, auth_plugin_map={b'mysql_cleartext_password': TestAuthentication.DefectiveHandler}, **self.db) if grants: # recreate the user cur.execute(grants) # select old_password("crummy p\tassword"); #| old_password("crummy p\tassword") | #| 2a01785203b08770 |
def test_issue_34(self): try: pymysql.connect(host="localhost", port=1237, user="root") self.fail() except pymysql.OperationalError as e: self.assertEqual(2003, e.args[0]) except Exception: self.fail()
def test_issue_35(self): conn = self.connections[0] c = conn.cursor() print("sudo killall -9 mysqld within the next 10 seconds") try: c.execute("select sleep(10)") self.fail() except pymysql.OperationalError as e: self.assertEqual(2013, e.args[0])
def test_affected_rows(self): self.assertEqual(self.conn.affected_rows(), 0, "Should return 0 before we do anything.") #def test_debug(self): ## FIXME Only actually tests if you lack SUPER #self.assertRaises(pymysql.OperationalError, #self.conn.dump_debug_info)
def kill_connection(self): try: self.Con.kill(self.Con.thread_id()) except (pymysql.OperationalError, pymysql.InternalError): pass
def test_issue_34(self): try: pymysql.connect(host="localhost", port=1237, user="root") self.fail() except pymysql.OperationalError, e: self.assertEqual(2003, e.args[0]) except: self.fail()
def test_issue_35(self): conn = self.connections[0] c = conn.cursor() print "sudo killall -9 mysqld within the next 10 seconds" try: c.execute("select sleep(10)") self.fail() except pymysql.OperationalError, e: self.assertEqual(2013, e.args[0])