Python pymysql 模块,escape_string() 实例源码

我们从Python开源项目中,提取了以下29个代码示例,用于说明如何使用pymysql.escape_string()

项目:my-scrapy    作者:azraelkuan    | 项目源码 | 文件源码
def process_item(self, item, spider):
        if self.__class__.__name__ in spider.pipelines:
            try:
                table = {
                    'jp': 'tabelog_jp',
                    'en': 'tabelog_en'
                }
                table_index = item['lang']
                table_name = table[table_index]

                sql = "insert into %s (store_id, `name`, address, center, nearest_station, main_type, sub_type, sub_sub_type, tag, average_rating, dinner_rating, " \
                      "lunch_rating, rating_review_num, dinner_price, lunch_price, photo_num, tel, sub_tel, open_time, seats_num, drink, opening_day, city_name, area_name, url) " \
                      "values(%s,'%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')"

                data = (table_name, item['store_id'], item['name'], item['address'], item['center'], item['nearest_station'], pymysql.escape_string(item['main_type']), pymysql.escape_string(item['sub_type']),
                        item['sub_sub_type'], item['tag'], item['average_rating'], item['dinner_rating'],
                        item['lunch_rating'], item['rating_review_num'], item['dinner_price'], item['lunch_price'], item['photo_num'], item['tel'], item['sub_tel'], item['open_time'],
                        item['seats_num'], item['drink'], item['opening_day'], item['city_name'], item['area_name'], item['url'])
                conn, cur = connDB()
                cur.execute(sql % data)
                conn.commit()
                cur.close()
                conn.close()
            except:
                print("********data exists********")
项目:Pt-Autoseed    作者:Rhilip    | 项目源码 | 文件源码
def upsert_seed_list(self, torrent_info):
        tid, name, tracker = torrent_info

        while True:
            if name in self.cache_torrent_name:
                raw_sql = "UPDATE `seed_list` SET `{cow}` = {id:d} WHERE `title`='{name}'"
                break
            else:
                exist = "SELECT COUNT(*) FROM `seed_list` WHERE `title`='{}'".format(pymysql.escape_string(name))
                if self.exec(sql=exist)[0] == 0:
                    raw_sql = "INSERT INTO `seed_list` (`title`,`{cow}`) VALUES ('{name}',{id:d})"
                    break
                else:
                    self.cache_torrent_list()

        sql = raw_sql.format(cow=tracker, name=pymysql.escape_string(name), id=tid)
        return self.exec(sql=sql)
项目:watchmen    作者:lycclsltt    | 项目源码 | 文件源码
def test_escape_fallback_encoder(self):
        con = self.connections[0]
        cur = con.cursor()

        class Custom(str):
            pass

        mapping = {text_type: pymysql.escape_string}
        self.assertEqual(con.escape(Custom('foobar'), mapping), "'foobar'")
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def test_escape_fallback_encoder(self):
        con = self.connections[0]
        cur = con.cursor()

        class Custom(str):
            pass

        mapping = {text_type: pymysql.escape_string}
        self.assertEqual(con.escape(Custom('foobar'), mapping), "'foobar'")
项目:mysql_audit    作者:ycg    | 项目源码 | 文件源码
def escape(self, string):
        return pymysql.escape_string(string)
项目:bawk    作者:jttwnsnd    | 项目源码 | 文件源码
def test_escape_fallback_encoder(self):
        con = self.connections[0]
        cur = con.cursor()

        class Custom(str):
            pass

        mapping = {text_type: pymysql.escape_string}
        self.assertEqual(con.escape(Custom('foobar'), mapping), "'foobar'")
项目:wyproxy    作者:ring04h    | 项目源码 | 文件源码
def join_field_value(self, data, glue = ', '):
        sql = comma = ''
        for key, value in data.iteritems():
            if isinstance(value, str):
                value = pymysql.escape_string(value)
            sql +=  "{}`{}` = '{}'".format(comma, key, value)
            comma = glue
        return sql
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def test_escape_fallback_encoder(self):
        con = self.connections[0]
        cur = con.cursor()

        class Custom(str):
            pass

        mapping = {text_type: pymysql.escape_string}
        self.assertEqual(con.escape(Custom('foobar'), mapping), "'foobar'")
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def test_escape_fallback_encoder(self):
        con = self.connections[0]
        cur = con.cursor()

        class Custom(str):
            pass

        mapping = {text_type: pymysql.escape_string}
        self.assertEqual(con.escape(Custom('foobar'), mapping), "'foobar'")
项目:rekall-agent-server    作者:rekall-innovations    | 项目源码 | 文件源码
def test_escape_fallback_encoder(self):
        con = self.connections[0]
        cur = con.cursor()

        class Custom(str):
            pass

        mapping = {text_type: pymysql.escape_string}
        self.assertEqual(con.escape(Custom('foobar'), mapping), "'foobar'")
项目:deb-python-pymysql    作者:openstack    | 项目源码 | 文件源码
def test_escape_fallback_encoder(self):
        con = self.connections[0]
        cur = con.cursor()

        class Custom(str):
            pass

        mapping = {text_type: pymysql.escape_string}
        self.assertEqual(con.escape(Custom('foobar'), mapping), "'foobar'")
项目:ServerlessCrawler-VancouverRealState    作者:MarcelloLins    | 项目源码 | 文件源码
def test_escape_fallback_encoder(self):
        con = self.connections[0]
        cur = con.cursor()

        class Custom(str):
            pass

        mapping = {text_type: pymysql.escape_string}
        self.assertEqual(con.escape(Custom('foobar'), mapping), "'foobar'")
项目:QualquerMerdaAPI    作者:tiagovizoto    | 项目源码 | 文件源码
def test_escape_fallback_encoder(self):
        con = self.connections[0]
        cur = con.cursor()

        class Custom(str):
            pass

        mapping = {text_type: pymysql.escape_string}
        self.assertEqual(con.escape(Custom('foobar'), mapping), "'foobar'")
项目:metrics    作者:Jeremy-Friedman    | 项目源码 | 文件源码
def test_escape_fallback_encoder(self):
        con = self.connections[0]
        cur = con.cursor()

        class Custom(str):
            pass

        mapping = {text_type: pymysql.escape_string}
        self.assertEqual(con.escape(Custom('foobar'), mapping), "'foobar'")
项目:metrics    作者:Jeremy-Friedman    | 项目源码 | 文件源码
def test_escape_fallback_encoder(self):
        con = self.connections[0]
        cur = con.cursor()

        class Custom(str):
            pass

        mapping = {text_type: pymysql.escape_string}
        self.assertEqual(con.escape(Custom('foobar'), mapping), "'foobar'")
项目:gardenbot    作者:GoestaO    | 项目源码 | 文件源码
def test_escape_fallback_encoder(self):
        con = self.connections[0]
        cur = con.cursor()

        class Custom(str):
            pass

        mapping = {text_type: pymysql.escape_string}
        self.assertEqual(con.escape(Custom('foobar'), mapping), "'foobar'")
项目:news_crawler    作者:Writtic    | 项目源码 | 文件源码
def escape(self, s):
        '''
        ???, ???? ? SQL ????? ??? ????? ? ??? ESCAPE?? ????.
        '''
        if s is None: return None
        return pymysql.escape_string(s)
项目:proxysql-tools    作者:twindb    | 项目源码 | 文件源码
def register_backend(self, backend):
        """Register Galera node in ProxySQL

        :param backend: Galera node.
        :type backend: ProxySQLMySQLBackend
        """
        if backend.comment:
            comment = "'%s'" % pymysql.escape_string(backend.comment)
        else:
            comment = 'NULL'

        query = "REPLACE INTO mysql_servers(`hostgroup_id`," \
                " `hostname`, `port`," \
                " `status`, `weight`, `compression`, `max_connections`," \
                " `max_replication_lag`, `use_ssl`, `max_latency_ms`," \
                " `comment`) " \
                "VALUES({hostgroup_id}, '{hostname}', {port}," \
                " '{status}', {weight}, {compression}, {max_connections}," \
                " {max_replication_lag}, {use_ssl}, {max_latency_ms}," \
                " {comment})" \
                "".format(hostgroup_id=int(backend.hostgroup_id),
                          hostname=pymysql.escape_string(backend.hostname),
                          port=int(backend.port),
                          status=pymysql.escape_string(backend.status),
                          weight=int(backend.weight),
                          compression=int(backend.compression),
                          max_connections=int(backend.max_connections),
                          max_replication_lag=int(backend.max_replication_lag),
                          use_ssl=int(backend.use_ssl),
                          max_latency_ms=int(backend.max_latency_ms),
                          comment=comment)
        self.execute(query)
        self.reload_runtime()
项目:proxysql-tools    作者:twindb    | 项目源码 | 文件源码
def deregister_backend(self, backend):
        """
        Deregister a Galera node from ProxySQL

        :param backend: Galera node.
        :type backend: ProxySQLMySQLBackend
        """
        query = "DELETE FROM mysql_servers WHERE hostgroup_id={hostgroup_id}" \
                " AND hostname='{hostname}'" \
                " AND port={port}" \
                "".format(hostgroup_id=int(backend.hostgroup_id),
                          hostname=pymysql.escape_string(backend.hostname),
                          port=int(backend.port))
        self.execute(query)
        self.reload_runtime()
项目:prophessor    作者:paulhauner    | 项目源码 | 文件源码
def test_escape_fallback_encoder(self):
        con = self.connections[0]
        cur = con.cursor()

        class Custom(str):
            pass

        mapping = {text_type: pymysql.escape_string}
        self.assertEqual(con.escape(Custom('foobar'), mapping), "'foobar'")
项目:Pt-Autoseed    作者:Rhilip    | 项目源码 | 文件源码
def get_data_clone_id(self, key, site) -> None or int:
        clone_id = None

        key = pymysql.escape_string(re.sub(r"[_\-. ]", "%", key))
        sql = "SELECT `{site}` FROM `info_list` WHERE `search_name` LIKE '{key}%'".format(site=site, key=key)
        try:  # Get clone id info from database
            clone_id = int(self.exec(sql=sql)[0])
        except TypeError:  # The database doesn't have the search data, Return dict only with raw key.
            logging.warning(
                "No record for key: \"{key}\" in \"{site}\". Or may set as `None`".format(key=key, site=site)
            )

        return clone_id
项目:blog_flask    作者:momantai    | 项目源码 | 文件源码
def test_escape_fallback_encoder(self):
        con = self.connections[0]
        cur = con.cursor()

        class Custom(str):
            pass

        mapping = {text_type: pymysql.escape_string}
        self.assertEqual(con.escape(Custom('foobar'), mapping), "'foobar'")
项目:flask    作者:bobohope    | 项目源码 | 文件源码
def test_escape_fallback_encoder(self):
        con = self.connections[0]
        cur = con.cursor()

        class Custom(str):
            pass

        mapping = {text_type: pymysql.escape_string}
        self.assertEqual(con.escape(Custom('foobar'), mapping), "'foobar'")
项目:power-pi    作者:Knapsacks    | 项目源码 | 文件源码
def test_escape_fallback_encoder(self):
        con = self.connections[0]
        cur = con.cursor()

        class Custom(str):
            pass

        mapping = {text_type: pymysql.escape_string}
        self.assertEqual(con.escape(Custom('foobar'), mapping), "'foobar'")
项目:power-pi    作者:Knapsacks    | 项目源码 | 文件源码
def test_escape_fallback_encoder(self):
        con = self.connections[0]
        cur = con.cursor()

        class Custom(str):
            pass

        mapping = {text_type: pymysql.escape_string}
        self.assertEqual(con.escape(Custom('foobar'), mapping), "'foobar'")
项目:cc98    作者:zjuchenyuan    | 项目源码 | 文件源码
def handler(meta, boardid, id, result, big):
    """
    ???????????????????????
    :param meta: ?mpms??
    :param boardid: ??id
    :param id: ??id
    :param result: ??????? list?? [??lc, ???user, ????content, ????posttime, ??????lastedittime]
    :param big: ???? ""?"big"
    :return: ????
    """
    if len(result) == 0:
        return
    if len(result) > 1000:  # avoid too long sql
        handler(meta, boardid, id, result[1000:], big)
        result = result[:1000]
    if result[0][0] == 0:  # ??????sql????result[0]?????????????????????
        try:
            showline = [boardid, id, result[0][2], len(result)]
            if myip != "":
                showline.insert(0, myip)  # if enables multiple ip, print IP first
            print(" ".join(str(i) for i in (showline)))
        except:
            try:
                print(" ".join(str(i) for i in (boardid, id, pformat(result[0][2]), len(result))))
            except:
                print("Something cannot print")
    global conn
    sql = "insert ignore into {}bbs_{}(id,lc,user,content,posttime,edittime,gettime) values ".format(big, boardid)
    for i in result:
        sql += "({},{},\"{}\",\"{}\",\"{}\",\"{}\",now()),".format(id, i[0],
                                                                   pymysql.escape_string(i[1]),
                                                                   pymysql.escape_string(i[2]), i[3], i[4])
    # print(sql)
    sql = sql[:-1]
    # ??????utf8mb4??????????emoji??
    cur = conn.cursor()
    try:
        cur.execute(
            "SET NAMES utf8mb4;SET CHARACTER SET utf8mb4; SET character_set_connection=utf8mb4;")  # ?????????????
    except:
        conn = db()
        cur.execute("SET NAMES utf8mb4;SET CHARACTER SET utf8mb4; SET character_set_connection=utf8mb4;")
    try:
        cur.execute(sql)
        conn.commit()
    except pymysql.err.ProgrammingError as e:  # ??????????????????????
        createTable(boardid, big=big)
        cur.execute(sql)
        conn.commit()
    except Exception as e:
        print(e)
项目:wizard    作者:honor100    | 项目源码 | 文件源码
def fetch_mysql_cluster_metadata():
    clusters = getAllMySQLInfo(cluster_role=1, flag='online')
    for cluster in clusters:
        fetch_mysql_metadata(cluster)
        # cluster_name = cluster.cluster_name
        # cluster_port = cluster.cluster_port
        # # ????????????????????????
        # dictSlaveConn = getSlaveConnStr(cluster_name)
        # if dictSlaveConn:
        #     dictConn = dictSlaveConn
        # else:
        #     dictConn = getMasterConnStr(cluster_name)
        # Host = dictConn['Host']
        # Port = dictConn['Port']
        # User = dictConn['User']
        # Password = dictConn['Password']
        # # ?????????
        # sqlContent = "select table_schema,table_name,table_type,engine,row_format,table_rows,avg_row_length," \
        #              "data_length,max_data_length,index_length,data_free,auto_increment,create_time,table_collation," \
        #              "create_options,table_comment from tables where table_schema not in {}".format(
        #     ('mysql', 'sys', 'mondmm', 'test', 'information_schema', 'performance_schema'),)
        # cluster_db = "information_schema"
        # try:
        #     _, results = mdb_query(sqlContent, Host, Port, User, Password, cluster_db, True)
        # except Exception as e:
        #     print("Mysql Error : %s" % e)
        #     return None
        # # ??????????
        # for result in results:
        #     table_schema = result.get('table_schema')
        #     table_name =   result.get('table_name')
        #     _, res = mdb_query("show create table {}".format(table_name), Host, Port, User, Password, table_schema)
        #     create_statement = pymysql.escape_string(res[0][1])
        #     result['create_statement'] = create_statement
        #     # ?????,????????????????.   (cluster_port, table_schema, table_name??????unique??)
        #     metadatas = mysql_cluster_metadata.objects.filter(Q(cluster_port=cluster_port) & Q(table_schema=table_schema) & Q(table_name=table_name))
        #     if metadatas:
        #         metadata = metadatas[0]
        #     else:
        #         metadata = mysql_cluster_metadata()
        #     metadata.cluster_name = cluster_name
        #     metadata.cluster_port = cluster_port
        #     metadata.table_schema = result['table_schema']
        #     metadata.table_name = result['table_name']
        #     metadata.table_type = result['table_type']
        #     metadata.engine = result['engine']
        #     metadata.row_format = result['row_format']
        #     metadata.table_rows = result['table_rows'] if result['table_rows'] else 0
        #     metadata.avg_row_length = result['avg_row_length']
        #     metadata.data_length = result['data_length']
        #     metadata.max_data_length = result['max_data_length']
        #     metadata.index_length = result['index_length']
        #     metadata.data_free = result['data_free']
        #     metadata.auto_increment = result['auto_increment'] if result['auto_increment'] else 0
        #     metadata.create_time = result['create_time']
        #     metadata.table_collation = result['table_collation']
        #     metadata.create_statement = result['create_statement']
        #     metadata.create_options = result['create_options']
        #     metadata.table_comment = result['table_comment']
        #     metadata.save()
项目:wizard    作者:honor100    | 项目源码 | 文件源码
def fetch_mysql_metadata(cluster):
    cluster_name = cluster.cluster_name
    cluster_port = cluster.cluster_port
    # ????????????????????????
    dictSlaveConn = getSlaveConnStr(cluster_name)
    if dictSlaveConn:
        dictConn = dictSlaveConn
    else:
        dictConn = getMasterConnStr(cluster_name)
    Host = dictConn['Host']
    Port = dictConn['Port']
    User = dictConn['User']
    Password = dictConn['Password']
    # ?????????
    sqlContent = "select table_schema,table_name,table_type,engine,row_format,table_rows,avg_row_length," \
                 "data_length,max_data_length,index_length,data_free,auto_increment,create_time,table_collation," \
                 "create_options,table_comment from tables where table_schema not in {}".format(
        ('mysql', 'sys', 'mondmm', 'test', 'information_schema', 'performance_schema'), )
    cluster_db = "information_schema"
    try:
        _, results = mdb_query(sqlContent, Host, Port, User, Password, cluster_db, True)
    except Exception as e:
        print("Mysql Error : %s" % e)
        return None
    # ??????????
    for result in results:
        table_schema = result.get('table_schema')
        table_name = result.get('table_name')
        _, res = mdb_query("show create table {}".format(table_name), Host, Port, User, Password, table_schema)
        create_statement = pymysql.escape_string(res[0][1])
        result['create_statement'] = create_statement
        # ?????,????????????????.   (cluster_port, table_schema, table_name??????unique??)
        metadatas = mysql_cluster_metadata.objects.filter(
            Q(cluster_port=cluster_port) & Q(table_schema=table_schema) & Q(table_name=table_name))
        if metadatas:
            metadata = metadatas[0]
        else:
            metadata = mysql_cluster_metadata()
        metadata.cluster_name = cluster_name
        metadata.cluster_port = cluster_port
        metadata.table_schema = result['table_schema']
        metadata.table_name = result['table_name']
        metadata.table_type = result['table_type']
        metadata.engine = result['engine']
        metadata.row_format = result['row_format']
        metadata.table_rows = result['table_rows'] if result['table_rows'] else 0
        metadata.avg_row_length = result['avg_row_length']
        metadata.data_length = result['data_length']
        metadata.max_data_length = result['max_data_length']
        metadata.index_length = result['index_length']
        metadata.data_free = result['data_free']
        metadata.auto_increment = result['auto_increment'] if result['auto_increment'] else 0
        metadata.create_time = result['create_time']
        metadata.table_collation = result['table_collation']
        metadata.create_statement = result['create_statement']
        metadata.create_options = result['create_options']
        metadata.table_comment = result['table_comment']
        metadata.save()
项目:wyproxy    作者:ring04h    | 项目源码 | 文件源码
def add_numbers():
    search = request.args.get('s')
    if not search or ':' not in search or "'" in search:
        return redirect('/')
    page = request.args.get('p', 1, type=int)
    page = page if page > 0 else 1

    limits = '{},{}'.format((page-1)*show_cnt, show_cnt)
    order = 'id desc'

    search_str = search.split(' ')
    params = {}
    for param in search_str:
        name, value = param.split(':')
        if name not in ['host', 'port', 'status_code','method', 'type', 'content_type', 'scheme', 'extension']:
            return redirect('/')
        params[name] = value

    condition = comma = ''
    glue = ' AND '
    for key, value in params.iteritems():
        if ',' in value and key in ['port','status_code','method','type']:
            values = [escape_string(x) for x in value.split(',')]
            condition +=  "{}`{}` in ('{}')".format(comma, key, "', '".join(values))
        elif key in ['host']:
            condition +=  "{}`{}` like '%{}'".format(comma, key, escape_string(value))
        else:
            condition +=  "{}`{}` = '{}'".format(comma, key, escape_string(value))
        comma = glue

    dbconn = connect_db()
    count_sql = 'select count(*) as cnt from capture where {}'.format(condition)
    record_size = int(dbconn.query(count_sql, fetchone=True).get('cnt'))

    max_page = record_size/show_cnt + 1

    records = dbconn.fetch_rows(
                table='capture',
                condition=condition,
                order=order,
                limit=limits)

    return render_template(
                    'index.html', 
                    records=records, 
                    page=page,
                    search=search,
                    max_page=max_page)