Python pymysql 模块,cursors() 实例源码

我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用pymysql.cursors()

项目:coquery    作者:gkunter    | 项目源码 | 文件源码
def execute_cursor(self, S, server_side=False):
        def dict_factory(cursor, row):
            d = {}
            for i, column in enumerate(cursor.description):
                d[column[0]] = row[i]
            return d

        S = S.strip()
        if options.cfg.explain_queries:
            self.explain(S)
        logger.debug(S)

        if self.db_type == SQL_MYSQL:
            if not self.Con.open:
                self.Con = self.get_connection()
            if server_side:
                cursor = self.Con.cursor(pymysql.cursors.SSDictCursor)
            else:
                cursor = self.Con.cursor(pymysql.cursors.DictCursor)
        elif self.db_type == SQL_SQLITE:
            con = self.get_connection()
            con.row_factory = dict_factory
            cursor = con.cursor()
        cursor.execute(S)
        return cursor
项目:Gravimetrics    作者:NPPC-UK    | 项目源码 | 文件源码
def create_connection(self, host, user, passwd, database):
        """This creates a connection object\
        to the database and returns it for use
        """
        try:
            self.connection = pymysql.connect(host,
                                              user,
                                              passwd,
                                              database,
                                              charset='utf8mb4',
                                              cursorclass=pymysql.cursors.DictCursor)

        except (pymysql.err.DatabaseError,
                pymysql.err.IntegrityError,
                pymysql.err.MySQLError) as exception:
            sys.stderr.write(str(exception))
            return 2

        finally:
            if self.connection is None:
                sys.stderr.write("Problem connecting to database\n")
        return 0
项目:Gravimetrics    作者:NPPC-UK    | 项目源码 | 文件源码
def create_connection(self, host, user, passwd, database):
        """This creates a connection object\
        to the database and returns it for use
        """
        try:
            self.connection = pymysql.connect(host,
                                              user,
                                              passwd,
                                              database,
                                              charset='utf8mb4',
                                              cursorclass=pymysql.cursors.DictCursor)

        except (pymysql.err.DatabaseError,
                pymysql.err.IntegrityError,
                pymysql.err.MySQLError) as exception:
            sys.stderr.write(exception)
            return 2

        finally:
            if self.connection is None:
                sys.stderr.write("Problem connecting to database\n")
        return 0
项目:lalascan    作者:blackye    | 项目源码 | 文件源码
def iter(self, query, *parameters, **kwparameters):
        """Returns an iterator for the given query and parameters."""
        self._ensure_connected()
        cursor = MySQLdb.cursors.SSCursor(self._db)
        try:
            self._execute(cursor, query, parameters, kwparameters)
            column_names = [d[0] for d in cursor.description]
            for row in cursor:
                yield Row(zip(column_names, row))
        finally:
            cursor.close()
项目:USTC-Software-2017    作者:igemsoftware2017    | 项目源码 | 文件源码
def make_connection(args):
    """
    Factory function to create and return a connection object.

    `args` is the parsed result from main arguments parser.
    """

    try:
        return pymysql.connect(
            host=args.host,
            port=args.port,
            user=args.user,
            password=args.password,
            db='igem',
            charset='utf8mb4',
            cursorclass=pymysql.cursors.DictCursor
        )
    except pymysql.err.DatabaseError as e:
        _display_error(e)
项目:luigi-warehouse    作者:groupon    | 项目源码 | 文件源码
def __init__(self,cfg_name='mysql_tunnel'):
        import paramiko
        import pymysql.cursors
        import pymysql
        self.host = luigi.configuration.get_config().get(cfg_name, 'host')
        self.database = luigi.configuration.get_config().get(cfg_name, 'database')
        self.user = luigi.configuration.get_config().get(cfg_name, 'user')
        self.password = luigi.configuration.get_config().get(cfg_name, 'password')
        self.port = int(luigi.configuration.get_config().get(cfg_name, 'port'))
        self.tunnel_dest_host = luigi.configuration.get_config().get(cfg_name, 'tunnel_dest_host')
        self.tunnel_dest_user = luigi.configuration.get_config().get(cfg_name, 'user')
        self.tunnel_source_host = luigi.configuration.get_config().get(cfg_name, 'tunnel_source_host ')
        self.tunnel_source_port = luigi.configuration.get_config().get(cfg_name, 'tunnel_source_port')
        self.channel = self.__tunnel__()
        self.connection = self.__connect__()
        self.cursor = self.connection.cursor()
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def connect(self):
        await self.instance.db.connect()
        await self.instance.apps.discover()
        await self.instance.db.initiate()

        if self.db_type != 'mysql':
            raise Exception('We only support mysql converting right now!')

        self.connection = pymysql.connect(
            host=self.db_host, user=self.db_user, password=self.db_password, db=self.db_name, charset=self.charset,
            port=self.db_port or 3306,
            cursorclass=pymysql.cursors.DictCursor
        )
项目:math_bot    作者:chirag9127    | 项目源码 | 文件源码
def __init__(self):
        self.params = get_params()
        self.conn = pymysql.connect(
            host=self.params.host,
            user=self.params.username, password=self.params.password,
            db='ceredron', charset='utf8mb4',
            cursorclass=pymysql.cursors.DictCursor)
项目:myRabbit    作者:bsab    | 项目源码 | 文件源码
def mysql_connect(self):
        """This method connects to MySQL, returning the connection handle.

        :rtype: pymysql.Connection

        """
        logger.info('Connecting to MySQL %s', self._mysql_url)

        # Connect to the database
        return pymysql.connect(host=self._mysql_url,
                               user=self._mysql_username,
                               password=self._mysql_psw,
                               db=self._mysql_dbname,
                               charset='utf8mb4',
                               cursorclass=pymysql.cursors.DictCursor)
项目:drupal-cloudformation    作者:jwilson8767    | 项目源码 | 文件源码
def handler(event, context):
    global connection, kmsKeyARN, application
    try:
        print(event)
        kmsKeyARN = event['ResourceProperties']['KMSKeyARN']
        application = event['ResourceProperties']['Application']
        print('Connecting to mysql instance..')
        connection = pymysql.connect(host=event['ResourceProperties']['Hostname'],
                                     port=int(event['ResourceProperties']['Port']),
                                     user=event['ResourceProperties']['Username'],
                                     password=decrypt(event['ResourceProperties']['Password']),
                                     cursorclass=pymysql.cursors.DictCursor)
        print('Connected.')
        response = {}
        if event['RequestType'] == 'Create':
            response = create(database=event['ResourceProperties']['Database'])
        elif event['RequestType'] == 'Update':
            response = update(database=event['ResourceProperties']['Database'],
                              old_database=event['OldResourceProperties']['Database'], username=event['PhysicalResourceId'])
        elif event['RequestType'] == 'Delete':
            if event['ResourceProperties']['RetainDatabase'] == 'false':
                delete(database=event['ResourceProperties']['Database'], username=event['PhysicalResourceId'])
            else:
                print('Retaining Database.')
        else:
            print('Invalid RequestType: ' + event['RequestType'])

        return cfnresponse.send(event,
                                context,
                                cfnresponse.SUCCESS,
                                response_data=response,
                                physical_resource_id=(
                                response['PhysicalResourceId'] if 'PhysicalResourceId' in response else None))
    except:
        logging.exception("Unhandled Exception")
        cfnresponse.send(event, context, cfnresponse.FAILED)
    finally:
        print('closing connection.')
        connection.close()
项目:memsql-top    作者:memsql    | 项目源码 | 文件源码
def __init__(self, host, port, database, user, password):
        self.conn = pymysql.connect(host=host, port=port, db=database,
                                    user=user, password=password,
                                    cursorclass=pymysql.cursors.DictCursor)
项目:graphscale    作者:schrockn    | 项目源码 | 文件源码
def pymysql_conn_from_info(conn_info: ConnectionInfo) -> pymysql.Connection:
    return pymysql.connect(
        host=conn_info.host,
        user=conn_info.user,
        password=conn_info.password,
        db=conn_info.db,
        charset=conn_info.charset,
        cursorclass=pymysql.cursors.DictCursor,
        autocommit=True,
    )
项目:ezrecords    作者:dareenzo    | 项目源码 | 文件源码
def _connect(self):
        if self._connection is None:
            '''
            don't set "use_unicode=0"
            SQLAlchemy people say don't set this at all
            http://docs.sqlalchemy.org/en/latest/dialects/mysql.html
            '''

            DB_CHARSET = os.getenv('DB_CHARSET', 'utf8mb4')
            DB_COLLATION = os.getenv('DB_COLLATION', 'utf8mb4_general_ci')
            DB_SQL_MODE = os.getenv('DB_SQL_MODE', "'ANSI,STRICT_TRANS_TABLES,STRICT_ALL_TABLES,ONLY_FULL_GROUP_BY'")
            DB_TIMEZONE = os.getenv('DB_TIMEZONE', "'+2:00'")

            self._connection = pymysql.connect(
                host=self._host,
                user=self._user,
                passwd=self._password,
                database=self._database,
                port=self._port,
                charset=DB_CHARSET,
                cursorclass=pymysql.cursors.DictCursor,
                # persistent
            )

            # Force MySQL to UTF-8 Encodingclear
            self.set_charset(DB_CHARSET, DB_COLLATION)
            # self.query("SET NAMES %s COLLATE %s" % (DB_CHARSET, DB_COLLATION))
            # self.query("SET CHARACTER_SET_RESULTS='%s'" % DB_CHARSET)

            # Default MySQL behavior to conform more closely to SQL standards.
            # This allows to run almost seamlessly on many different kinds of database systems.
            # These settings force MySQL to behave the same as postgresql, or sqlite
            # in regards to syntax interpretation and invalid data handling. See
            # https://www.drupal.org/node/344575 for further discussion. Also, as MySQL
            # 5.5 changed the meaning of TRADITIONAL we need to spell out the modes one by one
            self.query("SET SESSION sql_mode = %s" % DB_SQL_MODE)  # Force MySQL ANSI compatibilities

            self.query("SET SESSION time_zone = %s" % DB_TIMEZONE)
项目:hht    作者:zlizhe    | 项目源码 | 文件源码
def saveData(self, category):
        import pymysql.cursors
        import pymysql
        # Connect to the database
        connection = pymysql.connect(host='localhost',
                                     user='root',
                                     password='',
                                     db='hht',
                                     charset='utf8mb4',
                                     cursorclass=pymysql.cursors.DictCursor)

        try:

            #  ????
            with connection.cursor() as cursor:
                # Read a single record
                sql = "SELECT `id`, `typename`, `py` FROM `type` WHERE `py` = %s"
                cursor.execute(sql, category)
                typeArr = cursor.fetchone()


            # ??????????
            with connection.cursor() as cursor:
                for resOne in self._res:
                    sql = "INSERT INTO `res` (`type_id`, `name`, `link`) VALUES (%s, %s, %s)"
                    cursor.execute(sql, (typeArr['id'], resOne['name'], resOne['res']))

            connection.commit()
            print 'Insert ' + str(len(self._res)) + ' datas with category ' + typeArr['typename']
            del typeArr


        finally:
            connection.close()
        return

    # ????
项目:python    作者:panxus    | 项目源码 | 文件源码
def from_settings(cls,settings):
        args = settings['MYSQLDB']
        # args['cursorclass'] = pymysql.cursors.DictCursor
        dbpool = adbapi.ConnectionPool('pymysql',**args)
        return cls(dbpool)
项目:luigi-warehouse    作者:groupon    | 项目源码 | 文件源码
def __init__(self, cfg_name='mysql'):
        import pymysql.cursors
        import pymysql
        self.host = luigi.configuration.get_config().get(cfg_name, 'host')
        self.database = luigi.configuration.get_config().get(cfg_name, 'database')
        self.user = luigi.configuration.get_config().get(cfg_name, 'user')
        self.password = luigi.configuration.get_config().get(cfg_name, 'password')
        self.port = int(luigi.configuration.get_config().get(cfg_name, 'port'))
        self.connection = self.__connect__()
        self.cursor = self.connection.cursor()
项目:python-WCA    作者:LinusFresz    | 项目源码 | 文件源码
def query(self, query: str, args=None) -> Cursor:
        cursor = self.cnx.cursor(pymysql.cursors.DictCursor)
        cursor.execute(query, args)

        return cursor
项目:ressie    作者:lpredova    | 项目源码 | 文件源码
def __init__(self):

        try:
            self.connection = pymysql.connect(host="127.0.0.1",
                                              port=3307,
                                              user="ressie",
                                              password="123456",
                                              db="ressie",
                                              cursorclass=pymysql.cursors.DictCursor)

        except Exception as e:
            print(e)