Python MySQLdb 模块,NotSupportedError() 实例源码

我们从Python开源项目中,提取了以下19个代码示例,用于说明如何使用MySQLdb.NotSupportedError()

项目:forseti-security    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def select_group_ids(self, resource_name, timestamp):
        """Select the group ids from a snapshot table.

        Args:
            resource_name (str): String of the resource name.
            timestamp (str): String of timestamp, formatted as
                YYYYMMDDTHHMMSSZ.

        Returns:
            list: A list of group ids.

        Raises:
            MySQLError: When an error has occured while executing the query.
        """
        try:
            group_ids_sql = select_data.GROUP_IDS.format(timestamp)
            cursor = self.conn.cursor(cursorclass=cursors.DictCursor)
            cursor.execute(group_ids_sql)
            rows = cursor.fetchall()
            return [row['group_id'] for row in rows]
        except (DataError, IntegrityError, InternalError, NotSupportedError,
                OperationalError, ProgrammingError) as e:
            raise MySQLError(resource_name, e)
项目:forseti-security    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def execute_sql_with_fetch(self, resource_name, sql, values):
        """Executes a provided sql statement with fetch.

        Args:
            resource_name (str): String of the resource name.
            sql (str): String of the sql statement.
            values (tuple): Tuple of string for sql placeholder values.

        Returns:
            list: A list of dict representing rows of sql query result.

        Raises:
            MySQLError: When an error has occured while executing the query.
        """
        try:
            cursor = self.conn.cursor(cursorclass=cursors.DictCursor)
            cursor.execute(sql, values)
            return cursor.fetchall()
        except (DataError, IntegrityError, InternalError, NotSupportedError,
                OperationalError, ProgrammingError) as e:
            raise MySQLError(resource_name, e)
项目:forseti-security    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def execute_sql_with_commit(self, resource_name, sql, values):
        """Executes a provided sql statement with commit.

        Args:
            resource_name (str): String of the resource name.
            sql (str): String of the sql statement.
            values (tuple): Tuple of string for sql placeholder values.

        Raises:
            MySQLError: When an error has occured while executing the query.
        """
        try:
            cursor = self.conn.cursor()
            cursor.execute(sql, values)
            self.conn.commit()
        except (DataError, IntegrityError, InternalError, NotSupportedError,
                OperationalError, ProgrammingError) as e:
            raise MySQLError(resource_name, e)
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def _rollback(self):
        try:
            BaseDatabaseWrapper._rollback(self)
        except Database.NotSupportedError:
            pass
项目:DjangoBlog    作者:0daybug    | 项目源码 | 文件源码
def _rollback(self):
        try:
            BaseDatabaseWrapper._rollback(self)
        except Database.NotSupportedError:
            pass
项目:trydjango18    作者:lucifer-yqh    | 项目源码 | 文件源码
def _rollback(self):
        try:
            BaseDatabaseWrapper._rollback(self)
        except Database.NotSupportedError:
            pass
项目:trydjango18    作者:wei0104    | 项目源码 | 文件源码
def _rollback(self):
        try:
            BaseDatabaseWrapper._rollback(self)
        except Database.NotSupportedError:
            pass
项目:lifesoundtrack    作者:MTG    | 项目源码 | 文件源码
def _rollback(self):
        try:
            BaseDatabaseWrapper._rollback(self)
        except Database.NotSupportedError:
            pass
项目:forseti-security    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def get_buckets_acls(self, resource_name, timestamp):
        """Select the bucket acls from a bucket acls snapshot table.

        Args:
            resource_name (str): String of the resource name.
            timestamp (str): String of timestamp, formatted as
                YYYYMMDDTHHMMSSZ.

        Returns:
            list: List of bucket acls.

        Raises:
            MySQLError: An error with MySQL has occurred.
        """
        bucket_acls = {}
        cnt = 0
        try:
            bucket_acls_sql = select_data.BUCKET_ACLS.format(timestamp)
            rows = self.execute_sql_with_fetch(resource_name,
                                               bucket_acls_sql,
                                               None)
            for row in rows:
                bucket_acl = bkt_acls.\
                BucketAccessControls(bucket=row['bucket'],
                                     entity=row['entity'],
                                     email=row['email'],
                                     domain=row['domain'],
                                     role=row['role'],
                                     project_number=row['project_number'])
                bucket_acls[cnt] = bucket_acl
                cnt += 1
        except (DataError, IntegrityError, InternalError, NotSupportedError,
                OperationalError, ProgrammingError) as e:
            LOGGER.error(errors.MySQLError(resource_name, e))
        return bucket_acls
项目:forseti-security    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def get_bigquery_acls(self, resource_name, timestamp):
        """Select the Big Query acls from a Big Query acls snapshot table.

        Args:
            resource_name (str): String of the resource name.
            timestamp (str): String of timestamp, formatted as
            YYYYMMDDTHHMMSSZ.

        Returns:
            dict: Dictionary keyed by the count of ACLs and then the ACLs.

        Raises:
            MySQLError: An error with MySQL has occurred.
        """
        bigquery_acls = {}
        cnt = 0
        try:
            bigquery_acls_sql = select_data.BIGQUERY_ACLS.format(timestamp)
            rows = self.execute_sql_with_fetch(resource_name,
                                               bigquery_acls_sql,
                                               None)
        except (DataError, IntegrityError, InternalError, NotSupportedError,
                OperationalError, ProgrammingError) as e:
            LOGGER.error(errors.MySQLError(resource_name, e))

        for row in rows:
            bigquery_acl = bq_acls.BigqueryAccessControls(
                dataset_id=row['dataset_id'],
                special_group=row['access_special_group'],
                user_email=row['access_user_by_email'],
                domain=row['access_domain'],
                role=row['role'],
                group_email=row['access_group_by_email'],
                project_id=row['project_id'])
            bigquery_acls[cnt] = bigquery_acl
            cnt += 1

        return bigquery_acls
项目:forseti-security    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def _get_cloudsql_instance_acl_map(self, resource_name, timestamp):
        """Create CloudSQL instance acl map.

        Args:
            resource_name (str): String of the resource name.
            timestamp (str): String of timestamp, formatted as
                YYYYMMDDTHHMMSSZ.

        Returns:
            dict: Map of instance acls.

        Raises:
            MySQLError: An error with MySQL has occurred.
        """
        cloudsql_acls_map = {}
        try:
            cloudsql_acls_sql = select_data.CLOUDSQL_ACLS.format(timestamp)
            rows = self.execute_sql_with_fetch(resource_name,
                                               cloudsql_acls_sql,
                                               None)
            for row in rows:
                acl_list = []
                project_number = row['project_number']
                instance_name = row['instance_name']
                network = row['value']
                hash_key = hash(str(project_number) + ',' + instance_name)
                if hash_key in cloudsql_acls_map:
                    if network not in cloudsql_acls_map[hash_key]:
                        cloudsql_acls_map[hash_key].append(network)
                else:
                    acl_list.append(network)
                    cloudsql_acls_map[hash_key] = acl_list
        except (DataError, IntegrityError, InternalError, NotSupportedError,
                OperationalError, ProgrammingError) as e:
            LOGGER.error(errors.MySQLError(resource_name, e))

        return cloudsql_acls_map
项目:forseti-security    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def load_data(self, resource_name, timestamp, data):
        """Load data into a snapshot table.

        Args:
            resource_name (str): String of the resource name.
            timestamp (str): String of timestamp, formatted as
                YYYYMMDDTHHMMSSZ.
            data (iterable): An iterable or a list of data to be uploaded.

        Raises:
            MySQLError: When an error has occured while executing the query.
        """
        with csv_writer.write_csv(resource_name, data) as csv_file:
            try:
                snapshot_table_name = self._create_snapshot_table_name(
                    resource_name, timestamp)
                load_data_sql = load_data_sql_provider.provide_load_data_sql(
                    resource_name, csv_file.name, snapshot_table_name)
                LOGGER.debug('SQL: %s', load_data_sql)
                cursor = self.conn.cursor()
                cursor.execute(load_data_sql)
                self.conn.commit()
                # TODO: Return the snapshot table name so that it can be tracked
                # in the main snapshot table.
            except (DataError, IntegrityError, InternalError,
                    NotSupportedError, OperationalError,
                    ProgrammingError) as e:
                raise MySQLError(resource_name, e)
项目:forseti-security    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def get_latest_snapshot_timestamp(self, statuses):
        """Select the latest timestamp of the completed snapshot.

        Args:
            statuses (tuple): The tuple of snapshot statuses to filter on.

        Returns:
            str: The string timestamp of the latest complete snapshot.

        Raises:
            MySQLError: When no rows are found.
        """
        # Build a dynamic parameterized query string for filtering the
        # snapshot statuses
        if not isinstance(statuses, tuple):
            statuses = ('SUCCESS',)

        status_params = ','.join(['%s']*len(statuses))
        filter_clause = SNAPSHOT_STATUS_FILTER_CLAUSE.format(status_params)
        try:
            cursor = self.conn.cursor()
            cursor.execute(
                select_data.LATEST_SNAPSHOT_TIMESTAMP + filter_clause, statuses)
            row = cursor.fetchone()
            if row:
                return row[0]
            raise NoResultsError('No snapshot cycle found.')
        except (DataError, IntegrityError, InternalError, NotSupportedError,
                OperationalError, ProgrammingError, NoResultsError) as e:
            raise MySQLError('snapshot_cycles', e)
项目:liberator    作者:libscie    | 项目源码 | 文件源码
def _rollback(self):
        try:
            BaseDatabaseWrapper._rollback(self)
        except Database.NotSupportedError:
            pass
项目:djanoDoc    作者:JustinChavez    | 项目源码 | 文件源码
def _rollback(self):
        try:
            BaseDatabaseWrapper._rollback(self)
        except Database.NotSupportedError:
            pass
项目:django-next-train    作者:bitpixdigital    | 项目源码 | 文件源码
def _rollback(self):
        try:
            BaseDatabaseWrapper._rollback(self)
        except Database.NotSupportedError:
            pass
项目:LatinSounds_AppEnviaMail    作者:G3ek-aR    | 项目源码 | 文件源码
def _rollback(self):
        try:
            BaseDatabaseWrapper._rollback(self)
        except Database.NotSupportedError:
            pass
项目:django-wechat-api    作者:crazy-canux    | 项目源码 | 文件源码
def _rollback(self):
        try:
            BaseDatabaseWrapper._rollback(self)
        except Database.NotSupportedError:
            pass
项目:forseti-security    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def get_cloudsql_acls(self, resource_name, timestamp):
        """Select the cloudsql acls for project from a snapshot table.

        Args:
            resource_name (str): String of the resource name.
            timestamp (str): String of timestamp, formatted as
                YYYYMMDDTHHMMSSZ.

        Returns:
            list: List of cloudsql acls.

        Raises:
            MySQLError: An error with MySQL has occurred.
        """
        cloudsql_acls = {}
        cnt = 0
        try:
            cloudsql_instances_sql = (
                select_data.CLOUDSQL_INSTANCES.format(timestamp))
            rows = self.execute_sql_with_fetch(resource_name,
                                               cloudsql_instances_sql,
                                               None)
            acl_map = self._get_cloudsql_instance_acl_map(resource_name,
                                                          timestamp)

            for row in rows:
                project_number = row['project_number']
                instance_name = row['name']
                ssl_enabled = row['settings_ip_configuration_require_ssl']
                authorized_networks = self.\
                _get_networks_for_instance(acl_map,
                                           project_number,
                                           instance_name)

                cloudsql_acl = csql_acls.\
                CloudSqlAccessControl(instance_name=instance_name,
                                      authorized_networks=authorized_networks,
                                      ssl_enabled=ssl_enabled,
                                      project_number=project_number)
                cloudsql_acls[cnt] = cloudsql_acl
                cnt += 1
        except (DataError, IntegrityError, InternalError, NotSupportedError,
                OperationalError, ProgrammingError) as e:
            LOGGER.error(errors.MySQLError(resource_name, e))
        return cloudsql_acls