我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用pymysql.cursors()。
def execute_cursor(self, S, server_side=False): def dict_factory(cursor, row): d = {} for i, column in enumerate(cursor.description): d[column[0]] = row[i] return d S = S.strip() if options.cfg.explain_queries: self.explain(S) logger.debug(S) if self.db_type == SQL_MYSQL: if not self.Con.open: self.Con = self.get_connection() if server_side: cursor = self.Con.cursor(pymysql.cursors.SSDictCursor) else: cursor = self.Con.cursor(pymysql.cursors.DictCursor) elif self.db_type == SQL_SQLITE: con = self.get_connection() con.row_factory = dict_factory cursor = con.cursor() cursor.execute(S) return cursor
def create_connection(self, host, user, passwd, database): """This creates a connection object\ to the database and returns it for use """ try: self.connection = pymysql.connect(host, user, passwd, database, charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor) except (pymysql.err.DatabaseError, pymysql.err.IntegrityError, pymysql.err.MySQLError) as exception: sys.stderr.write(str(exception)) return 2 finally: if self.connection is None: sys.stderr.write("Problem connecting to database\n") return 0
def create_connection(self, host, user, passwd, database): """This creates a connection object\ to the database and returns it for use """ try: self.connection = pymysql.connect(host, user, passwd, database, charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor) except (pymysql.err.DatabaseError, pymysql.err.IntegrityError, pymysql.err.MySQLError) as exception: sys.stderr.write(exception) return 2 finally: if self.connection is None: sys.stderr.write("Problem connecting to database\n") return 0
def iter(self, query, *parameters, **kwparameters): """Returns an iterator for the given query and parameters.""" self._ensure_connected() cursor = MySQLdb.cursors.SSCursor(self._db) try: self._execute(cursor, query, parameters, kwparameters) column_names = [d[0] for d in cursor.description] for row in cursor: yield Row(zip(column_names, row)) finally: cursor.close()
def make_connection(args): """ Factory function to create and return a connection object. `args` is the parsed result from main arguments parser. """ try: return pymysql.connect( host=args.host, port=args.port, user=args.user, password=args.password, db='igem', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) except pymysql.err.DatabaseError as e: _display_error(e)
def __init__(self,cfg_name='mysql_tunnel'): import paramiko import pymysql.cursors import pymysql self.host = luigi.configuration.get_config().get(cfg_name, 'host') self.database = luigi.configuration.get_config().get(cfg_name, 'database') self.user = luigi.configuration.get_config().get(cfg_name, 'user') self.password = luigi.configuration.get_config().get(cfg_name, 'password') self.port = int(luigi.configuration.get_config().get(cfg_name, 'port')) self.tunnel_dest_host = luigi.configuration.get_config().get(cfg_name, 'tunnel_dest_host') self.tunnel_dest_user = luigi.configuration.get_config().get(cfg_name, 'user') self.tunnel_source_host = luigi.configuration.get_config().get(cfg_name, 'tunnel_source_host ') self.tunnel_source_port = luigi.configuration.get_config().get(cfg_name, 'tunnel_source_port') self.channel = self.__tunnel__() self.connection = self.__connect__() self.cursor = self.connection.cursor()
def connect(self): await self.instance.db.connect() await self.instance.apps.discover() await self.instance.db.initiate() if self.db_type != 'mysql': raise Exception('We only support mysql converting right now!') self.connection = pymysql.connect( host=self.db_host, user=self.db_user, password=self.db_password, db=self.db_name, charset=self.charset, port=self.db_port or 3306, cursorclass=pymysql.cursors.DictCursor )
def __init__(self): self.params = get_params() self.conn = pymysql.connect( host=self.params.host, user=self.params.username, password=self.params.password, db='ceredron', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor)
def mysql_connect(self): """This method connects to MySQL, returning the connection handle. :rtype: pymysql.Connection """ logger.info('Connecting to MySQL %s', self._mysql_url) # Connect to the database return pymysql.connect(host=self._mysql_url, user=self._mysql_username, password=self._mysql_psw, db=self._mysql_dbname, charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor)
def handler(event, context): global connection, kmsKeyARN, application try: print(event) kmsKeyARN = event['ResourceProperties']['KMSKeyARN'] application = event['ResourceProperties']['Application'] print('Connecting to mysql instance..') connection = pymysql.connect(host=event['ResourceProperties']['Hostname'], port=int(event['ResourceProperties']['Port']), user=event['ResourceProperties']['Username'], password=decrypt(event['ResourceProperties']['Password']), cursorclass=pymysql.cursors.DictCursor) print('Connected.') response = {} if event['RequestType'] == 'Create': response = create(database=event['ResourceProperties']['Database']) elif event['RequestType'] == 'Update': response = update(database=event['ResourceProperties']['Database'], old_database=event['OldResourceProperties']['Database'], username=event['PhysicalResourceId']) elif event['RequestType'] == 'Delete': if event['ResourceProperties']['RetainDatabase'] == 'false': delete(database=event['ResourceProperties']['Database'], username=event['PhysicalResourceId']) else: print('Retaining Database.') else: print('Invalid RequestType: ' + event['RequestType']) return cfnresponse.send(event, context, cfnresponse.SUCCESS, response_data=response, physical_resource_id=( response['PhysicalResourceId'] if 'PhysicalResourceId' in response else None)) except: logging.exception("Unhandled Exception") cfnresponse.send(event, context, cfnresponse.FAILED) finally: print('closing connection.') connection.close()
def __init__(self, host, port, database, user, password): self.conn = pymysql.connect(host=host, port=port, db=database, user=user, password=password, cursorclass=pymysql.cursors.DictCursor)
def pymysql_conn_from_info(conn_info: ConnectionInfo) -> pymysql.Connection: return pymysql.connect( host=conn_info.host, user=conn_info.user, password=conn_info.password, db=conn_info.db, charset=conn_info.charset, cursorclass=pymysql.cursors.DictCursor, autocommit=True, )
def _connect(self): if self._connection is None: ''' don't set "use_unicode=0" SQLAlchemy people say don't set this at all http://docs.sqlalchemy.org/en/latest/dialects/mysql.html ''' DB_CHARSET = os.getenv('DB_CHARSET', 'utf8mb4') DB_COLLATION = os.getenv('DB_COLLATION', 'utf8mb4_general_ci') DB_SQL_MODE = os.getenv('DB_SQL_MODE', "'ANSI,STRICT_TRANS_TABLES,STRICT_ALL_TABLES,ONLY_FULL_GROUP_BY'") DB_TIMEZONE = os.getenv('DB_TIMEZONE', "'+2:00'") self._connection = pymysql.connect( host=self._host, user=self._user, passwd=self._password, database=self._database, port=self._port, charset=DB_CHARSET, cursorclass=pymysql.cursors.DictCursor, # persistent ) # Force MySQL to UTF-8 Encodingclear self.set_charset(DB_CHARSET, DB_COLLATION) # self.query("SET NAMES %s COLLATE %s" % (DB_CHARSET, DB_COLLATION)) # self.query("SET CHARACTER_SET_RESULTS='%s'" % DB_CHARSET) # Default MySQL behavior to conform more closely to SQL standards. # This allows to run almost seamlessly on many different kinds of database systems. # These settings force MySQL to behave the same as postgresql, or sqlite # in regards to syntax interpretation and invalid data handling. See # https://www.drupal.org/node/344575 for further discussion. Also, as MySQL # 5.5 changed the meaning of TRADITIONAL we need to spell out the modes one by one self.query("SET SESSION sql_mode = %s" % DB_SQL_MODE) # Force MySQL ANSI compatibilities self.query("SET SESSION time_zone = %s" % DB_TIMEZONE)
def saveData(self, category): import pymysql.cursors import pymysql # Connect to the database connection = pymysql.connect(host='localhost', user='root', password='', db='hht', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor) try: # ???? with connection.cursor() as cursor: # Read a single record sql = "SELECT `id`, `typename`, `py` FROM `type` WHERE `py` = %s" cursor.execute(sql, category) typeArr = cursor.fetchone() # ?????????? with connection.cursor() as cursor: for resOne in self._res: sql = "INSERT INTO `res` (`type_id`, `name`, `link`) VALUES (%s, %s, %s)" cursor.execute(sql, (typeArr['id'], resOne['name'], resOne['res'])) connection.commit() print 'Insert ' + str(len(self._res)) + ' datas with category ' + typeArr['typename'] del typeArr finally: connection.close() return # ????
def from_settings(cls,settings): args = settings['MYSQLDB'] # args['cursorclass'] = pymysql.cursors.DictCursor dbpool = adbapi.ConnectionPool('pymysql',**args) return cls(dbpool)
def __init__(self, cfg_name='mysql'): import pymysql.cursors import pymysql self.host = luigi.configuration.get_config().get(cfg_name, 'host') self.database = luigi.configuration.get_config().get(cfg_name, 'database') self.user = luigi.configuration.get_config().get(cfg_name, 'user') self.password = luigi.configuration.get_config().get(cfg_name, 'password') self.port = int(luigi.configuration.get_config().get(cfg_name, 'port')) self.connection = self.__connect__() self.cursor = self.connection.cursor()
def query(self, query: str, args=None) -> Cursor: cursor = self.cnx.cursor(pymysql.cursors.DictCursor) cursor.execute(query, args) return cursor
def __init__(self): try: self.connection = pymysql.connect(host="127.0.0.1", port=3307, user="ressie", password="123456", db="ressie", cursorclass=pymysql.cursors.DictCursor) except Exception as e: print(e)