我们从Python开源项目中,提取了以下29个代码示例,用于说明如何使用pymysql.escape_string()。
def process_item(self, item, spider): if self.__class__.__name__ in spider.pipelines: try: table = { 'jp': 'tabelog_jp', 'en': 'tabelog_en' } table_index = item['lang'] table_name = table[table_index] sql = "insert into %s (store_id, `name`, address, center, nearest_station, main_type, sub_type, sub_sub_type, tag, average_rating, dinner_rating, " \ "lunch_rating, rating_review_num, dinner_price, lunch_price, photo_num, tel, sub_tel, open_time, seats_num, drink, opening_day, city_name, area_name, url) " \ "values(%s,'%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')" data = (table_name, item['store_id'], item['name'], item['address'], item['center'], item['nearest_station'], pymysql.escape_string(item['main_type']), pymysql.escape_string(item['sub_type']), item['sub_sub_type'], item['tag'], item['average_rating'], item['dinner_rating'], item['lunch_rating'], item['rating_review_num'], item['dinner_price'], item['lunch_price'], item['photo_num'], item['tel'], item['sub_tel'], item['open_time'], item['seats_num'], item['drink'], item['opening_day'], item['city_name'], item['area_name'], item['url']) conn, cur = connDB() cur.execute(sql % data) conn.commit() cur.close() conn.close() except: print("********data exists********")
def upsert_seed_list(self, torrent_info): tid, name, tracker = torrent_info while True: if name in self.cache_torrent_name: raw_sql = "UPDATE `seed_list` SET `{cow}` = {id:d} WHERE `title`='{name}'" break else: exist = "SELECT COUNT(*) FROM `seed_list` WHERE `title`='{}'".format(pymysql.escape_string(name)) if self.exec(sql=exist)[0] == 0: raw_sql = "INSERT INTO `seed_list` (`title`,`{cow}`) VALUES ('{name}',{id:d})" break else: self.cache_torrent_list() sql = raw_sql.format(cow=tracker, name=pymysql.escape_string(name), id=tid) return self.exec(sql=sql)
def test_escape_fallback_encoder(self): con = self.connections[0] cur = con.cursor() class Custom(str): pass mapping = {text_type: pymysql.escape_string} self.assertEqual(con.escape(Custom('foobar'), mapping), "'foobar'")
def escape(self, string): return pymysql.escape_string(string)
def join_field_value(self, data, glue = ', '): sql = comma = '' for key, value in data.iteritems(): if isinstance(value, str): value = pymysql.escape_string(value) sql += "{}`{}` = '{}'".format(comma, key, value) comma = glue return sql
def escape(self, s): ''' ???, ???? ? SQL ????? ??? ????? ? ??? ESCAPE?? ????. ''' if s is None: return None return pymysql.escape_string(s)
def register_backend(self, backend): """Register Galera node in ProxySQL :param backend: Galera node. :type backend: ProxySQLMySQLBackend """ if backend.comment: comment = "'%s'" % pymysql.escape_string(backend.comment) else: comment = 'NULL' query = "REPLACE INTO mysql_servers(`hostgroup_id`," \ " `hostname`, `port`," \ " `status`, `weight`, `compression`, `max_connections`," \ " `max_replication_lag`, `use_ssl`, `max_latency_ms`," \ " `comment`) " \ "VALUES({hostgroup_id}, '{hostname}', {port}," \ " '{status}', {weight}, {compression}, {max_connections}," \ " {max_replication_lag}, {use_ssl}, {max_latency_ms}," \ " {comment})" \ "".format(hostgroup_id=int(backend.hostgroup_id), hostname=pymysql.escape_string(backend.hostname), port=int(backend.port), status=pymysql.escape_string(backend.status), weight=int(backend.weight), compression=int(backend.compression), max_connections=int(backend.max_connections), max_replication_lag=int(backend.max_replication_lag), use_ssl=int(backend.use_ssl), max_latency_ms=int(backend.max_latency_ms), comment=comment) self.execute(query) self.reload_runtime()
def deregister_backend(self, backend): """ Deregister a Galera node from ProxySQL :param backend: Galera node. :type backend: ProxySQLMySQLBackend """ query = "DELETE FROM mysql_servers WHERE hostgroup_id={hostgroup_id}" \ " AND hostname='{hostname}'" \ " AND port={port}" \ "".format(hostgroup_id=int(backend.hostgroup_id), hostname=pymysql.escape_string(backend.hostname), port=int(backend.port)) self.execute(query) self.reload_runtime()
def get_data_clone_id(self, key, site) -> None or int: clone_id = None key = pymysql.escape_string(re.sub(r"[_\-. ]", "%", key)) sql = "SELECT `{site}` FROM `info_list` WHERE `search_name` LIKE '{key}%'".format(site=site, key=key) try: # Get clone id info from database clone_id = int(self.exec(sql=sql)[0]) except TypeError: # The database doesn't have the search data, Return dict only with raw key. logging.warning( "No record for key: \"{key}\" in \"{site}\". Or may set as `None`".format(key=key, site=site) ) return clone_id
def handler(meta, boardid, id, result, big): """ ??????????????????????? :param meta: ?mpms?? :param boardid: ??id :param id: ??id :param result: ??????? list?? [??lc, ???user, ????content, ????posttime, ??????lastedittime] :param big: ???? ""?"big" :return: ???? """ if len(result) == 0: return if len(result) > 1000: # avoid too long sql handler(meta, boardid, id, result[1000:], big) result = result[:1000] if result[0][0] == 0: # ??????sql????result[0]????????????????????? try: showline = [boardid, id, result[0][2], len(result)] if myip != "": showline.insert(0, myip) # if enables multiple ip, print IP first print(" ".join(str(i) for i in (showline))) except: try: print(" ".join(str(i) for i in (boardid, id, pformat(result[0][2]), len(result)))) except: print("Something cannot print") global conn sql = "insert ignore into {}bbs_{}(id,lc,user,content,posttime,edittime,gettime) values ".format(big, boardid) for i in result: sql += "({},{},\"{}\",\"{}\",\"{}\",\"{}\",now()),".format(id, i[0], pymysql.escape_string(i[1]), pymysql.escape_string(i[2]), i[3], i[4]) # print(sql) sql = sql[:-1] # ??????utf8mb4??????????emoji?? cur = conn.cursor() try: cur.execute( "SET NAMES utf8mb4;SET CHARACTER SET utf8mb4; SET character_set_connection=utf8mb4;") # ????????????? except: conn = db() cur.execute("SET NAMES utf8mb4;SET CHARACTER SET utf8mb4; SET character_set_connection=utf8mb4;") try: cur.execute(sql) conn.commit() except pymysql.err.ProgrammingError as e: # ?????????????????????? createTable(boardid, big=big) cur.execute(sql) conn.commit() except Exception as e: print(e)
def fetch_mysql_cluster_metadata(): clusters = getAllMySQLInfo(cluster_role=1, flag='online') for cluster in clusters: fetch_mysql_metadata(cluster) # cluster_name = cluster.cluster_name # cluster_port = cluster.cluster_port # # ???????????????????????? # dictSlaveConn = getSlaveConnStr(cluster_name) # if dictSlaveConn: # dictConn = dictSlaveConn # else: # dictConn = getMasterConnStr(cluster_name) # Host = dictConn['Host'] # Port = dictConn['Port'] # User = dictConn['User'] # Password = dictConn['Password'] # # ????????? # sqlContent = "select table_schema,table_name,table_type,engine,row_format,table_rows,avg_row_length," \ # "data_length,max_data_length,index_length,data_free,auto_increment,create_time,table_collation," \ # "create_options,table_comment from tables where table_schema not in {}".format( # ('mysql', 'sys', 'mondmm', 'test', 'information_schema', 'performance_schema'),) # cluster_db = "information_schema" # try: # _, results = mdb_query(sqlContent, Host, Port, User, Password, cluster_db, True) # except Exception as e: # print("Mysql Error : %s" % e) # return None # # ?????????? # for result in results: # table_schema = result.get('table_schema') # table_name = result.get('table_name') # _, res = mdb_query("show create table {}".format(table_name), Host, Port, User, Password, table_schema) # create_statement = pymysql.escape_string(res[0][1]) # result['create_statement'] = create_statement # # ?????,????????????????. (cluster_port, table_schema, table_name??????unique??) # metadatas = mysql_cluster_metadata.objects.filter(Q(cluster_port=cluster_port) & Q(table_schema=table_schema) & Q(table_name=table_name)) # if metadatas: # metadata = metadatas[0] # else: # metadata = mysql_cluster_metadata() # metadata.cluster_name = cluster_name # metadata.cluster_port = cluster_port # metadata.table_schema = result['table_schema'] # metadata.table_name = result['table_name'] # metadata.table_type = result['table_type'] # metadata.engine = result['engine'] # metadata.row_format = result['row_format'] # metadata.table_rows = result['table_rows'] if result['table_rows'] else 0 # metadata.avg_row_length = result['avg_row_length'] # metadata.data_length = result['data_length'] # metadata.max_data_length = result['max_data_length'] # metadata.index_length = result['index_length'] # metadata.data_free = result['data_free'] # metadata.auto_increment = result['auto_increment'] if result['auto_increment'] else 0 # metadata.create_time = result['create_time'] # metadata.table_collation = result['table_collation'] # metadata.create_statement = result['create_statement'] # metadata.create_options = result['create_options'] # metadata.table_comment = result['table_comment'] # metadata.save()
def fetch_mysql_metadata(cluster): cluster_name = cluster.cluster_name cluster_port = cluster.cluster_port # ???????????????????????? dictSlaveConn = getSlaveConnStr(cluster_name) if dictSlaveConn: dictConn = dictSlaveConn else: dictConn = getMasterConnStr(cluster_name) Host = dictConn['Host'] Port = dictConn['Port'] User = dictConn['User'] Password = dictConn['Password'] # ????????? sqlContent = "select table_schema,table_name,table_type,engine,row_format,table_rows,avg_row_length," \ "data_length,max_data_length,index_length,data_free,auto_increment,create_time,table_collation," \ "create_options,table_comment from tables where table_schema not in {}".format( ('mysql', 'sys', 'mondmm', 'test', 'information_schema', 'performance_schema'), ) cluster_db = "information_schema" try: _, results = mdb_query(sqlContent, Host, Port, User, Password, cluster_db, True) except Exception as e: print("Mysql Error : %s" % e) return None # ?????????? for result in results: table_schema = result.get('table_schema') table_name = result.get('table_name') _, res = mdb_query("show create table {}".format(table_name), Host, Port, User, Password, table_schema) create_statement = pymysql.escape_string(res[0][1]) result['create_statement'] = create_statement # ?????,????????????????. (cluster_port, table_schema, table_name??????unique??) metadatas = mysql_cluster_metadata.objects.filter( Q(cluster_port=cluster_port) & Q(table_schema=table_schema) & Q(table_name=table_name)) if metadatas: metadata = metadatas[0] else: metadata = mysql_cluster_metadata() metadata.cluster_name = cluster_name metadata.cluster_port = cluster_port metadata.table_schema = result['table_schema'] metadata.table_name = result['table_name'] metadata.table_type = result['table_type'] metadata.engine = result['engine'] metadata.row_format = result['row_format'] metadata.table_rows = result['table_rows'] if result['table_rows'] else 0 metadata.avg_row_length = result['avg_row_length'] metadata.data_length = result['data_length'] metadata.max_data_length = result['max_data_length'] metadata.index_length = result['index_length'] metadata.data_free = result['data_free'] metadata.auto_increment = result['auto_increment'] if result['auto_increment'] else 0 metadata.create_time = result['create_time'] metadata.table_collation = result['table_collation'] metadata.create_statement = result['create_statement'] metadata.create_options = result['create_options'] metadata.table_comment = result['table_comment'] metadata.save()
def add_numbers(): search = request.args.get('s') if not search or ':' not in search or "'" in search: return redirect('/') page = request.args.get('p', 1, type=int) page = page if page > 0 else 1 limits = '{},{}'.format((page-1)*show_cnt, show_cnt) order = 'id desc' search_str = search.split(' ') params = {} for param in search_str: name, value = param.split(':') if name not in ['host', 'port', 'status_code','method', 'type', 'content_type', 'scheme', 'extension']: return redirect('/') params[name] = value condition = comma = '' glue = ' AND ' for key, value in params.iteritems(): if ',' in value and key in ['port','status_code','method','type']: values = [escape_string(x) for x in value.split(',')] condition += "{}`{}` in ('{}')".format(comma, key, "', '".join(values)) elif key in ['host']: condition += "{}`{}` like '%{}'".format(comma, key, escape_string(value)) else: condition += "{}`{}` = '{}'".format(comma, key, escape_string(value)) comma = glue dbconn = connect_db() count_sql = 'select count(*) as cnt from capture where {}'.format(condition) record_size = int(dbconn.query(count_sql, fetchone=True).get('cnt')) max_page = record_size/show_cnt + 1 records = dbconn.fetch_rows( table='capture', condition=condition, order=order, limit=limits) return render_template( 'index.html', records=records, page=page, search=search, max_page=max_page)