Python MySQLdb 模块,Connect() 实例源码

我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用MySQLdb.Connect()

项目:BackManager    作者:linuxyan    | 项目源码 | 文件源码
def getConnection(self):
        return MySQLdb.Connect(
                           host=self.DB_HOST,
                           port=self.DB_PORT,
                           user=self.DB_USER,
                           passwd=self.DB_PWD,
                           db=self.DB_NAME,
                           charset='utf8'
                           )
项目:BackManager    作者:linuxyan    | 项目源码 | 文件源码
def getConnection(self):
        return MySQLdb.Connect(
                           host=self.DB_HOST,
                           port=self.DB_PORT,
                           user=self.DB_USER,
                           passwd=self.DB_PWD,
                           db=self.DB_NAME,
                           charset='utf8'
                           )
项目:cscoins    作者:csgames    | 项目源码 | 文件源码
def connect(self):
        conn = None
        try:
            conn = MySQLdb.Connect(
                host=self.hostname,
                port=self.port,
                user=self.username,
                password=self.password,
                db=self.db)
        except Exception as e:
            print("MySQL Connection error : {0}".format(e))

        return conn
项目:cscoins    作者:BrandonWade    | 项目源码 | 文件源码
def connect(self):
        conn = None
        try:
            conn = MySQLdb.Connect(host=self.hostname, port=self.port, user=self.username, password=self.password, db=self.db)
        except Exception as e:
            print("MySQL Connection error : {0}".format(e))

        return conn
项目:python    作者:hienha    | 项目源码 | 文件源码
def getConn(conninfo):
    host = conninfo['host']
    user = conninfo['user']
    password = conninfo['password']
    port = conninfo['port']
    sock = conninfo['sock']
    db = conninfo['db']
    # print host, user, password, port, db
    try:
        conn = MySQLdb.Connect(host=host, user=user, passwd=password, port=port, unix_socket=sock, db=db, charset='utf8')
    except Exception as e:
        print e
        sys.exit()

    return conn
项目:myquerykill    作者:seanlook    | 项目源码 | 文件源码
def db_reconnect(db_user, db_id):
    db_pass = settings.DB_AUTH[db_user]
    pc = prpcryptec.prpcrypt(KEY_DB_AUTH)

    db_instance = get_setttings("db_info", db_id)
    db_host, db_port = db_instance.replace(' ', '').split(':')

    db_conn = None

    while not db_conn:
        try:
            logger.warn("Reconnect Database %s: host='%s', user='%s, port=%s",
                        db_id, db_host, db_user, db_port)
            db_conn = MySQLdb.Connect(host=db_host, user=db_user, passwd=pc.decrypt(db_pass), port=int(db_port),
                                      connect_timeout=5, use_unicode=False)

        except MySQLdb.Error, e:

            if e.args[0] in (2013, 2003):
                logger.critical('Error %d: %s', e.args[0], e.args[1])
                logger.warn("Reconnect Database %s: host='%s', user='%s, port=%s",
                            db_id, db_host, db_user, db_port)
                db_conn = MySQLdb.Connect(host=db_host, user=db_user, passwd=pc.decrypt(db_pass), port=int(db_port),
                                          connect_timeout=5, use_unicode=False)

        except Exception as inst:
            print "Error %s %s" % type(inst), inst.args.__str__()

        time.sleep(10)

    return db_conn


# judge this thread meet kill_opt or not
项目:Snakepit    作者:K4lium    | 项目源码 | 文件源码
def __init__(self):
        self.__cfgReporting = Config(os.path.join(RAGPICKER_ROOT, 'config', 'reporting.conf'))
        self.__mysqlEnabled = self.__cfgReporting.getOption("mysql", "enabled")

        if self.__mysqlEnabled:
            #Anbindung an Datenbank MySQL herstellen
            try:
                mysqlHost = self.__cfgReporting.getOption("mysql", "host")
                mysqlPort = self.__cfgReporting.getOption("mysql", "port")
                mysqlDatabase = self.__cfgReporting.getOption("mysql", "database")
                mysqlUser = self.__cfgReporting.getOption("mysql", "user")
                mysqlPassword = self.__cfgReporting.getOption("mysql", "password")
                self.__mysqlConnection = MySQLdb.Connect(host=mysqlHost, port=mysqlPort, db=mysqlDatabase, user=mysqlUser, passwd=mysqlPassword)
            except (Exception) as e:
                raise Exception("Cannot connect to MySQL (ragpicker): %s" % e)
项目:OnlineSchemaChange    作者:facebookincubator    | 项目源码 | 文件源码
def default_get_mysql_connection(
        user_name, user_pass, socket, dbname='',
        timeout=60, connect_timeout=10, charset=None):
    """
    Default method for connection to a MySQL instance.
    You can override this behaviour by define/import in cli.py and pass it to
    Payload at instantiation time.
    The function should return a valid Connection object just as
    MySQLdb.Connect does.
    """
    connection_config = {
        'user': user_name,
        'passwd': user_pass,
        'unix_socket': socket,
        'db': dbname,
        'use_unicode': True,
        'connect_timeout': connect_timeout
    }
    if charset:
        connection_config['charset'] = charset
    dbh = MySQLdb.Connect(**connection_config)
    dbh.autocommit(True)
    if timeout:
        cursor = dbh.cursor()
        cursor.execute("SET SESSION WAIT_TIMEOUT = %s", (timeout,))
    return dbh
项目:NewBisMonitor    作者:cCloudSky    | 项目源码 | 文件源码
def c2db(self):
        self.conn = MySQLdb.Connect(
            host = self.host,
            port = self.port,
            user = self.user,
            passwd = self.passwd,
            db = self.db,
            )
        self.cur = self.conn.cursor()
项目:DBschema_gather    作者:seanlook    | 项目源码 | 文件源码
def query_table_info(*dbinfo):

    sql_str = """
    SELECT
        IFNULL(@@hostname, @@server_id) SERVER_NAME,
        %s as HOST,
        t.TABLE_SCHEMA,
        t.TABLE_NAME,
        t.TABLE_ROWS,
        t.DATA_LENGTH,
        t.INDEX_LENGTH,
        t.AUTO_INCREMENT,
      c.COLUMN_NAME,
      c.DATA_TYPE,
      LOCATE('unsigned', c.COLUMN_TYPE) COL_UNSIGNED
      # CONCAT(c.DATA_TYPE, IF(LOCATE('unsigned', c.COLUMN_TYPE)=0, '', '_unsigned'))
    FROM
        information_schema.`TABLES` t
    LEFT JOIN information_schema.`COLUMNS` c ON t.TABLE_SCHEMA = c.TABLE_SCHEMA
    AND t.TABLE_NAME = c.TABLE_NAME
    AND c.EXTRA = 'auto_increment'
    WHERE
        t.TABLE_SCHEMA NOT IN (
            'mysql',
            'information_schema',
            'performance_schema',
            'sys'
        )
    AND t.TABLE_TYPE = 'BASE TABLE'
    """
    dbinfo_str = "%s:%d" % (dbinfo[0], dbinfo[1])
    rs = ({},)
    try:
        db_conn = MySQLdb.Connect(host=dbinfo[0], port=dbinfo[1], user=dbinfo[2], passwd=dbinfo[3],
                                  connect_timeout=5)
        cur = db_conn.cursor(MySQLdb.cursors.DictCursor)
        param = (dbinfo_str,)

        print "\n[%s] Get schema info from db: '%s'..." % (datetime.today(), dbinfo_str)
        cur.execute(sql_str, param)

        rs = cur.fetchall()

    except MySQLdb.Error, e:
        print "Error[%d]: %s (%s)" % (e.args[0], e.args[1], dbinfo_str)
        exit(-1)

    if InfluxDB_INFO is not None:
        print "Write '%s' schema table info to influxdb ..." % dbinfo_str
        write_influxdb(*rs)
    else:
        print rs