我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用pymysql.connect()。
def request(query, verbose=False): try: db = pymysql.connect(host="localhost",user="yann",passwd="yann",db="doctorat") except Exception: print("Error in MySQL connexion") else: cur = db.cursor() try: cur.execute(query) except Exception: print("Error with query: " + query) else: db.commit() result = cur.fetchall() print(result) db.close()
def catch_ptt_max_index(ptt_class_name,sql_name): conn = ( pymysql.connect(host = '114.34.138.146', port = 3306, user = user, password = password, database = database, charset="utf8") ) cursor = conn.cursor() cursor.execute('SELECT * FROM `'+ sql_name +'` ORDER BY id DESC LIMIT 1;') data = cursor.fetchone() index = data[10] index = index.replace('http://www.ptt.cc/bbs/'+ ptt_class_name +'/index','') index = index.replace('.html','') index = int(index) cursor.close() conn.close() return index #--------------------------------------------------------------------------------- #--------------------------------------------------------------------------------- # ? SQL ? data ???, ?data?????? SQL ? max data time
def catch_ptt_history_date_time(ptt_class_name,sql_name): conn = ( pymysql.connect(host = '114.34.138.146', port = 3306, user = user, password = password, database = database, charset="utf8") ) cursor = conn.cursor() cursor.execute('SELECT * FROM `'+ sql_name +'` ORDER BY id DESC LIMIT 1;') data = cursor.fetchone() date_time = date_to_numeric( data[1] ) cursor.close() conn.close() return date_time #--------------------------------------------------------------------------------- # ?????? index, index ???????, ????????? index
def logged_in(self, response): conn = pymysql.connect(**DB_CONFIG) cursor = conn.cursor() sql = 'select * from section' cursor.execute(sql) for row in cursor.fetchall(): item = ByrbbsArticleItem() item['section_url'] = row[1] yield scrapy.Request(response.urljoin(row[1]), meta={'cookiejar': response.meta['cookiejar'], 'item': item}, headers=HEADERS, callback=self.parse_article_list) # ??????????? # self.start_urls = ['https://bbs.byr.cn/board/BM_Market'] # item = ByrbbsArticleItem() # item['section_url'] = 'board/BM_Market' # return scrapy.Request(self.start_urls[0], meta={'cookiejar': response.meta['cookiejar'], 'item': item}, # headers=HEADERS, callback=self.parse_article_list) # ??????????????????????
def logged_in(self, response): conn = pymysql.connect(**DB_CONFIG) cursor = conn.cursor() sql = 'select * from section' cursor.execute(sql) for row in cursor.fetchall(): item = ByrbbsArticleItem() item['section_url'] = row[1] yield scrapy.Request(response.urljoin(row[1]), meta={'cookiejar': response.meta['cookiejar'], 'item': item}, headers=HEADERS, callback=self.parse_article_list_pre) # ??????????? # self.start_urls = ['https://bbs.byr.cn/board/BUPTPost'] # item = ByrbbsArticleItem() # item['section_url'] = 'BUPTPost' # return scrapy.Request(self.start_urls[0], meta={'cookiejar': response.meta['cookiejar'], 'item': item}, # headers=HEADERS, callback=self.parse_article_list) # ?????????
def process_item(self, item, spider): con = pymysql.connect(**DB_CONFIG) cur = con.cursor() sql = 'insert into articleinfo(section_url,article_title,' \ 'article_url,article_comment,article_author,article_createtime,updatetime) ' \ 'values(%s,%s,%s,%s,%s,%s,%s)' values = (item['section_url'], item['article_title'], item['article_url'], item['article_comment'], item['article_author'], item['article_createtime'], item['updatetime']) cur.execute(sql, values) # second parameter must be iterabale # sql2 = 'insert into articlebody(article_url,article_content) values(%s,%s)' # values2 = (item['article_url'], item['article_content']) # cur.execute(sql2, values2) con.commit() cur.close() con.close() return item
def __init__(self): QtWidgets.QMainWindow.__init__(self) Ui_MainWindow.__init__(self) self.setupUi(self) self.__register__ = None self.__attendance____ = None self.btn_Register.clicked.connect(self.Register) self.btn_Attendance.clicked.connect(self.Attendance) self.btnSearch.clicked.connect(self.Search) self.report_date.setDate(QtCore.QDate.currentDate()) cursor = connection.cursor() sql = "Select * from attendance" cursor.execute(sql) result = cursor.fetchall() rows = len(result) if rows <= 0: QMessageBox.about(self, "No Data", "No Attendance has been recorded yet") else: self.tableWidget.setRowCount(rows) self.tableWidget.setColumnCount(3) header_labels = ['Matric Number', 'Date', 'Status'] self.tableWidget.setHorizontalHeaderLabels(header_labels) for count in range(0, rows): self.tableWidget.setItem(count, 0, QTableWidgetItem(str(result[count]["matric_num"].encode('ascii', 'ignore')))) self.tableWidget.setItem(count, 1, QTableWidgetItem(result[count]["dte"].encode('ascii', 'ignore'))) self.tableWidget.setItem(count, 2, QTableWidgetItem(result[count]["status"].encode('ascii', 'ignore')))
def __init__(self,host,user,password,db,charset='utf8',logFile=None,color=True,debug=4): ''' :param host: <class str|host name> :param user: <class str|user name> :param password: <class str|password> :param db: <class str|database name> :param charset: default='utf8' <class str> :param logFile: default=None <class str> :param color: default=True <class bool> :param debug: default=4 <class int|0 NONE,1 [Error],2 [Error][WARING],3 [Error][WARING][INFO],4 ALL> ''' self.logFile = logFile self.color = color self.debug = debug self.success = 0 self.fail = 0 self.repeat = 0 self._conn = MYSQL.connect(host,user,password,db,charset=charset) self._cursor = self._conn.cursor()
def connect(self, **kwargs): """ Connect :param path: sqlite file to connect """ host = kwargs['host'] port = kwargs['port'] user = kwargs['user'] pwd = kwargs['pwd'] schema = kwargs['schema'] self.conn = pymysql.connect(host=host, port=port, user=user, password=pwd, db=schema, charset='utf8mb4', cursorclass=pymysql.cursors.SSDictCursor) self.cursor = self.conn.cursor() return self.conn is not None and self.cursor is not None
def test_context(self): with self.assertRaises(ValueError): c = pymysql.connect(**self.databases[0]) with c as cur: cur.execute('create table test ( a int )') c.begin() cur.execute('insert into test values ((1))') raise ValueError('pseudo abort') c.commit() c = pymysql.connect(**self.databases[0]) with c as cur: cur.execute('select count(*) from test') self.assertEqual(0, cur.fetchone()[0]) cur.execute('insert into test values ((1))') with c as cur: cur.execute('select count(*) from test') self.assertEqual(1,cur.fetchone()[0]) cur.execute('drop table test')
def test_defer_connect(self): import socket for db in self.databases: d = db.copy() try: sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.connect(d['unix_socket']) except KeyError: sock = socket.create_connection( (d.get('host', 'localhost'), d.get('port', 3306))) for k in ['unix_socket', 'host', 'port']: try: del d[k] except KeyError: pass c = pymysql.connect(defer_connect=True, **d) self.assertFalse(c.open) c.connect(sock) c.close() sock.close()
def test_issue_17(self): """could not connect mysql use passwod""" conn = self.connections[0] host = self.databases[0]["host"] db = self.databases[0]["db"] c = conn.cursor() # grant access to a table to a user with a password try: with warnings.catch_warnings(): warnings.filterwarnings("ignore") c.execute("drop table if exists issue17") c.execute("create table issue17 (x varchar(32) primary key)") c.execute("insert into issue17 (x) values ('hello, world!')") c.execute("grant all privileges on %s.issue17 to 'issue17user'@'%%' identified by '1234'" % db) conn.commit() conn2 = pymysql.connect(host=host, user="issue17user", passwd="1234", db=db) c2 = conn2.cursor() c2.execute("select x from issue17") self.assertEqual("hello, world!", c2.fetchone()[0]) finally: c.execute("drop table issue17")
def test_issue_114(self): """ autocommit is not set after reconnecting with ping() """ conn = pymysql.connect(charset="utf8", **self.databases[0]) conn.autocommit(False) c = conn.cursor() c.execute("""select @@autocommit;""") self.assertFalse(c.fetchone()[0]) conn.close() conn.ping() c.execute("""select @@autocommit;""") self.assertFalse(c.fetchone()[0]) conn.close() # Ensure autocommit() is still working conn = pymysql.connect(charset="utf8", **self.databases[0]) c = conn.cursor() c.execute("""select @@autocommit;""") self.assertFalse(c.fetchone()[0]) conn.close() conn.ping() conn.autocommit(True) c.execute("""select @@autocommit;""") self.assertTrue(c.fetchone()[0]) conn.close()
def test_issue_491(self): """ Test warning propagation """ conn = pymysql.connect(charset="utf8", **self.databases[0]) with warnings.catch_warnings(): # Ignore all warnings other than pymysql generated ones warnings.simplefilter("ignore") warnings.simplefilter("error", category=pymysql.Warning) # verify for both buffered and unbuffered cursor types for cursor_class in (cursors.Cursor, cursors.SSCursor): c = conn.cursor(cursor_class) try: c.execute("SELECT CAST('124b' AS SIGNED)") c.fetchall() except pymysql.Warning as e: # Warnings should have errorcode and string message, just like exceptions self.assertEqual(len(e.args), 2) self.assertEqual(e.args[0], 1292) self.assertTrue(isinstance(e.args[1], text_type)) else: self.fail("Should raise Warning") finally: c.close()
def test_example(self): conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='mysql') cur = conn.cursor() cur.execute("SELECT Host,User FROM user") # print cur.description # r = cur.fetchall() # print r # ...or... u = False for r in cur.fetchall(): u = u or conn.user in r self.assertTrue(u) cur.close() conn.close()
def __init__(self,dbname,key,city): self.dbname = dbname self.T = datetime.datetime.strftime(datetime.datetime.now(), "%Y%m%d%H%M") # ???????? self.table_name = "{}_{}_{}".format(self.T,key,city) # ????? self.conn = pymysql.connect( host="localhost", port=3306, user='root', password='python', db=self.dbname, charset='utf8' ) # ???? self.cursor = self.conn.cursor()
def load_db_sqlite(dicts, dbname, table): ''' dicts: a dictionary dbname: sqlite3 database file name ''' import sqlite3 conn = sqlite3.connect(dbname) curs = conn.cursor() curs.execute(''' create table IF NOT EXISTS ? (time char(100), author char(10), url char(100), title char(150), content char(10000)) ''', table) curs.execute(''' INSERT INTO `articles` (`time`, `author`, `url`, `title`, `content`) VALUES (?,?,?,?,?) ''', dicts) conn.commit() conn.close()
def main(): mysql_config = secret_config['mysql'] db = pymysql.connect(host=mysql_config['host'], port=mysql_config['port'], db=mysql_config['db'], user=mysql_config['user'], passwd=mysql_config['passwd']) msg = "CAUTION! This will drop all tables and probably going to cause loss of data. Are you sure want to continue?" approved = input("%s (y/N) " % msg).lower() == 'y' if not approved: return print("-->Dropping tables from \"" + mysql_config['db'] + "\"...") clear_db(db) print("-->Tables dropped") print("-->Creating tables on \"" + mysql_config['db'] + "\"...") init_db(db) print("-->Tables created.")
def connect(self, **kwargs): """ Connect :param path: sqlite file to connect """ host = kwargs['host'] port = kwargs['port'] user = kwargs['user'] pwd = kwargs['pwd'] schema = kwargs['schema'] self.conn = pymysql.connect(host=host, port=port, user=user, password=pwd, db=schema, charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor) self.cursor = self.conn.cursor() return self.conn is not None and self.cursor is not None
def connect(self,tryTimes=1): i = 0 while i < tryTimes : try : self.conn = pymysql.connect( host=self.host, user=self.user, port=self.port, passwd=self.passwd, db=self.db, charset="utf8", ) self.logger.debug("connect ok!current retry : {0}".format(i)) break except Exception: i += 1 self.logger.error(traceback.format_exc()) return self.conn
def create_connection(arguments): """Create a connection to the SQL database using arguments.""" arguments = vars(arguments) if on_forge(): arguments["host"] = "tools-db" arguments["db"] = "s51138__heritage_p" credentials = get_db_credentials() arguments["user"] = credentials["user"] arguments["password"] = credentials["password"] return pymysql.connect( host=arguments["host"], user=arguments["user"], password=arguments["password"], db=arguments["db"], charset="utf8")
def good_token(email:str, uri:str) -> bool: """ Confirms whether an email and uri pair are valid. :param email the email which we are testing the uri for :param uri the identifier token which we geerated and sent :returns True if the token is valid (i.e. sent by us to this email), False otherwise (including if a DB error occured) """ with sqlite3.connect(p.DBNAME) as conn: c = conn.cursor() c.execute("SELECT * FROM uris WHERE uri=?", (uri,)) row = c.fetchone() if not row or row[0] != email: return False return True
def Open(self, p_autocommit=True): try: self.v_con = psycopg2.connect( self.GetConnectionString(), cursor_factory=psycopg2.extras.DictCursor ) self.v_con.autocommit = p_autocommit self.v_cur = self.v_con.cursor() self.v_start = True # PostgreSQL types self.v_cur.execute('select oid, typname from pg_type') self.v_types = dict([(r['oid'], r['typname']) for r in self.v_cur.fetchall()]) if not p_autocommit: self.v_con.commit() self.v_con.notices = DataList() except Spartacus.Database.Exception as exc: raise exc except psycopg2.Error as exc: raise Spartacus.Database.Exception(str(exc)) except Exception as exc: raise Spartacus.Database.Exception(str(exc))
def __init__(self, block=None, server_id=None, log_file=None, log_pos=None, host=None, user=None, passwd=None, rollback=None, port=None, gtid=None, _thread_id=None, stop_pos=None): import pymysql _remote_filed._gtid = gtid _remote_filed._thread_id = _thread_id self._stop_pos = stop_pos self._log_file = log_file self._log_pos = log_pos self.block = block if block != None else False self.server_id = server_id if server_id != None else 133 self.port = port if port != None else 3306 self.connection = pymysql.connect(host=host, user=user, password=passwd, port=self.port, db='', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor) if rollback: _remote_filed._rollback_status = True _rollback._myfunc = GetRollStatement(host=host, user=user, passwd=passwd, port=self.port) self.ReadPack()
def test_issue_17(self): """ could not connect mysql use passwod """ conn = self.connections[0] host = self.databases[0]["host"] db = self.databases[0]["db"] c = conn.cursor() # grant access to a table to a user with a password try: c.execute("create table issue17 (x varchar(32) primary key)") c.execute("insert into issue17 (x) values ('hello, world!')") c.execute("grant all privileges on %s.issue17 to 'issue17user'@'%%' identified by '1234'" % db) conn.commit() conn2 = pymysql.connect(host=host, user="issue17user", passwd="1234", db=db) c2 = conn2.cursor() c2.execute("select x from issue17") self.assertEqual("hello, world!", c2.fetchone()[0]) finally: c.execute("drop table issue17")
def process_item(self, item, spider): if self.__class__.__name__ in spider.pipelines: try: conn = pymysql.connect(host='localhost', user='root', passwd='067116', db='gaode', charset='utf8') cur = conn.cursor() sql = "insert into %s(uid,`name`,address,tag,sub_tag,center,tel,pro_name,pro_center,city_name," \ "city_center,ad_name,ad_center,photo_url1,photo_url2,photo_url3,photo_exists)" \ "values('%s','%s','%s','%s','%s', '%s', '%s', '%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')" data = (item['table_name'], item['uid'], item['name'], item['address'], item['tag'], item['sub_tag'], item['center'], item['tel'], item['pro_name'], item['pro_center'], item['city_name'], item['city_center'], item['ad_name'], item['ad_center'], item['photo_url1'], item['photo_url2'], item['photo_url3'], item['photo_exists']) cur.execute(sql % data) conn.commit() cur.close() conn.close() except: pass else: return item
def process_item(self, item, spider): if self.__class__.__name__ in spider.pipelines: try: conn = pymysql.connect(host='localhost', user='root', passwd='067116', db='gaode', charset='utf8') cur = conn.cursor() sql = 'insert into xishiduo(uid,`name`,address,tag,sub_tag,center,tel,pro_name,pro_center,city_name,' \ 'city_center,ad_name,ad_center,distance,photo_urls,photo_exists) ' \ 'values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)' data = (item['uid'], item['name'], item['address'], item['tag'], item['sub_tag'], item['center'], item['tel'], item['pro_name'], item['pro_center'], item['city_name'], item['city_center'], item['ad_name'], item['ad_center'], item['distance'], item['photo_urls'], item['photo_exists']) cur.execute(sql, data) conn.commit() cur.close() conn.close() self.i += 1 print(self.i) except: pass else: return item
def process_item(self, item, spider): if self.__class__.__name__ in spider.pipelines: try: conn = pymysql.connect(host='localhost', user='root', passwd='067116', db='gaode', charset='utf8') cur = conn.cursor() sql = 'insert into test(uid,`name`,address,tag,sub_tag,center,tel,pro_name,pro_center,city_name,' \ 'city_center,ad_name,ad_center,distance,photo_urls,photo_exists,distributor) ' \ 'values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)' data = (item['uid'], item['name'], item['address'], item['tag'], item['sub_tag'], item['center'], item['tel'], item['pro_name'], item['pro_center'], item['city_name'], item['city_center'], item['ad_name'], item['ad_center'], item['distance'], item['photo_urls'], item['photo_exists'], item['distributor']) cur.execute(sql, data) conn.commit() cur.close() conn.close() except: print("**********exists**********") else: return item
def process_item(self, item, spider): conn = pymysql.connect(host='localhost', user='root', passwd='067116', db='baike', charset='utf8') cur = conn.cursor() if len(item) > 1: try: sql = "insert into baidubaike(title,summary,basic_info,level2,pv,item_id,last_update_time,url) values(%s,%s,%s,%s,%s,%s,%s,%s)" data = (item['title'], item['summary'], json.dumps(item['basic_info']), json.dumps(item['level2']), item['pv'], item['item_id'], item['last_update_time'], item['url']) cur.execute(sql, data) conn.commit() cur.close() conn.close() except: print("exists,id is %s" % item['item_id']) else: try: sql = "insert into error(url) values('%s')" cur.execute(sql % item['url']) conn.commit() cur.close() conn.close() except: print("error page exists")
def __init__(self,block = None,server_id = None,log_file = None, log_pos = None,host=None,user=None,passwd=None,rollback=None, port = None,gtid = None,_thread_id = None,stop_pos=None): import pymysql _remote_filed._gtid = gtid _remote_filed._thread_id = _thread_id self._stop_pos = stop_pos self._log_file = log_file self._log_pos = log_pos self.block = block if block != None else False self.server_id = server_id if server_id != None else 133 self.port = port if port != None else 3306 self.connection = pymysql.connect(host=host, user=user, password=passwd,port=self.port, db='', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor) if rollback: _remote_filed._rollback_status = True _rollback._myfunc = GetRollStatement(host=host,user=user,passwd=passwd,port=self.port) self.ReadPack()
def creat_sql_file(sql_string,dataset_name): conn = ( pymysql.connect(host = '114.34.138.146',# SQL IP port = 3306, user = user,# ?? password = password,# ?? database = database, # ????? charset="utf8") ) # ?? c=conn.cursor() c.execute( sql_string )# ???? SQL file c.execute('ALTER TABLE `'+dataset_name+'` ADD id BIGINT(10) NOT NULL AUTO_INCREMENT PRIMARY KEY;') c.close() # ??? SQL ??? conn.close()# ??? SQL ??? #--------------------------------------------------------------------------------- # ? ptt ??
def creat_sql_file(sql_string,dataset_name): conn = ( pymysql.connect(host = '114.34.138.146',# SQL IP port = 3306, user='guest',# ?? password='123',# ?? database='guest_dataset', # ????? charset="utf8") ) # ?? c=conn.cursor() c.execute( sql_string )# ???? SQL file # ? PRIMARY KEY c.execute('ALTER TABLE `'+dataset_name+'` ADD id BIGINT(10) NOT NULL AUTO_INCREMENT PRIMARY KEY;') c.close() # ??? SQL ??? conn.close()# ??? SQL ???
def connect(self): await self.instance.db.connect() await self.instance.apps.discover() await self.instance.db.initiate() if self.db_type != 'mysql': raise Exception('We only support mysql converting right now!') self.connection = pymysql.connect( host=self.db_host, user=self.db_user, password=self.db_password, db=self.db_name, charset=self.charset, port=self.db_port or 3306, cursorclass=pymysql.cursors.DictCursor )
def start(self): if not self.connection: raise Exception('Please connect first (connect()).') return await self.migrate(self.connection)
def __init__(self,host='marshal.netsec.colostate.edu',user='root',passwd='****',dbname='bgp_archive'): #Prepare DB info self.db = pymysql.connect(host=host, user=user, passwd=passwd, db=dbname) self.dbname=dbname
def _open(self): """ DO NOT USE THIS UNLESS YOU close() FIRST""" try: self.db = connect( host=self.settings.host, port=self.settings.port, user=coalesce(self.settings.username, self.settings.user), passwd=coalesce(self.settings.password, self.settings.passwd), db=coalesce(self.settings.schema, self.settings.db), charset=u"utf8", use_unicode=True, ssl=coalesce(self.settings.ssl, None), cursorclass=cursors.SSCursor ) except Exception as e: if self.settings.host.find("://") == -1: Log.error(u"Failure to connect to {{host}}:{{port}}", host= self.settings.host, port= self.settings.port, cause=e ) else: Log.error(u"Failure to connect. PROTOCOL PREFIX IS PROBABLY BAD", e) self.cursor = None self.partial_rollback = False self.transaction_level = 0 self.backlog = [] # accumulate the write commands so they are sent at once
def insert(sql): conn = pymysql.connect("localhost", "root", "root", "test", port=3306, charset='utf8') cur = conn.cursor() print(sql) sta = cur.execute(sql) if sta == 1: print('Done sql') else: print('Failed sql.') conn.commit() cur.close() conn.close() # Step2.??HTML???????MYSQL
def __init__(self): # load(sys) self.conn = pymysql.connect(host="127.0.0.1", user="orange", password="@orangeLIU3226677zc", port=3306, db='customs_data', charset='utf8') self.conn.set_charset("utf8")
def process_item(self, item, spider): # ?spider.name?????spider # https://segmentfault.com/q/1010000004863755 con = pymysql.connect(**DB_CONFIG) cur = con.cursor() sql = 'insert into section(section_url,section_name,section_article_total,top_section_num,top_section_name,updatetime) ' \ 'values(%s,%s,%s,%s,%s,%s)' values = (item['section_url'], item['section_name'], item['section_article_total'], item['top_section_num'], item['top_section_name'], item['updatetime']) cur.execute(sql, values) # second parameter must be iterabale con.commit() cur.close() con.close() return item
def process_item(self, item, spider): con = pymysql.connect(**DB_CONFIG) cur = con.cursor() sql = 'replace into articleinfohour(section_url,article_title,' \ 'article_url,article_comment,article_author,article_createtime,updatetime) ' \ 'values(%s,%s,%s,%s,%s,%s,%s)' values = (item['section_url'], item['article_title'], item['article_url'], item['article_comment'], item['article_author'], item['article_createtime'], item['updatetime']) cur.execute(sql, values) # second parameter must be iterabale con.commit() cur.close() con.close() return item