Python sqlalchemy 模块,engine() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.engine()

项目:annotated-py-sqlalchemy    作者:hhstore    | 项目源码 | 文件源码
def delete_obj(self, objects, uow):
        """called by a UnitOfWork object to delete objects, which involves a
        DELETE statement for each table used by this mapper, for each object in the list."""
        for table in self.tables:
            if not self._has_pks(table):
                continue
            delete = []
            for obj in objects:
                params = {}
                if not hasattr(obj, "_instance_key"):
                    continue
                else:
                    delete.append(params)
                for col in self.pks_by_table[table]:
                    params[col.key] = self._getattrbycolumn(obj, col)
                self.extension.before_delete(self, obj)
            if len(delete):
                clause = sql.and_()
                for col in self.pks_by_table[table]:
                    clause.clauses.append(col == sql.bindparam(col.key))
                statement = table.delete(clause)
                c = statement.execute(*delete)
                if table.engine.supports_sane_rowcount() and c.rowcount != len(delete):
                    raise "ConcurrencyError - updated rowcount %d does not match number of objects updated %d" % (c.cursor.rowcount, len(delete))
项目:annotated-py-sqlalchemy    作者:hhstore    | 项目源码 | 文件源码
def __init__(self, engine, statement, parameters=None, typemap=None, **kwargs):
        """constructs a new ANSICompiler object.

        engine - SQLEngine to compile against

        statement - ClauseElement to be compiled

        parameters - optional dictionary indicating a set of bind parameters
        specified with this Compiled object.  These parameters are the "default"
        key/value pairs when the Compiled is executed, and also may affect the 
        actual compilation, as in the case of an INSERT where the actual columns
        inserted will correspond to the keys present in the parameters."""
        sql.Compiled.__init__(self, engine, statement, parameters)
        self.binds = {}
        self.froms = {}
        self.wheres = {}
        self.strings = {}
        self.select_stack = []
        self.typemap = typemap or {}
        self.isinsert = False
项目:triage    作者:dssg    | 项目源码 | 文件源码
def retrieve_model_id_from_hash(db_engine, model_hash):
    """Retrieves a model id from the database that matches the given hash

    Args:
        db_engine (sqlalchemy.engine) A database engine
        model_hash (str) The model hash to lookup

    Returns: (int) The model id (if found in DB), None (if not)
    """
    session = sessionmaker(bind=db_engine)()
    try:
        saved = session.query(Model)\
            .filter_by(model_hash=model_hash)\
            .one_or_none()
        return saved.model_id if saved else None
    finally:
        session.close()
项目:triage    作者:dssg    | 项目源码 | 文件源码
def save_db_objects(db_engine, db_objects):
    """Saves a collection of SQLAlchemy model objects to the database using a COPY command

    Args:
        db_engine (sqlalchemy.engine)
        db_objects (list) SQLAlchemy model objects, corresponding to a valid table
    """
    with tempfile.TemporaryFile(mode='w+') as f:
        writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL)
        for db_object in db_objects:
            writer.writerow([
                getattr(db_object, col.name)
                for col in db_object.__table__.columns
            ])
        f.seek(0)
        postgres_copy.copy_from(f, type(db_objects[0]), db_engine, format='csv')
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def build_filter(cls, engine, table, tree):
        try:
            operator, nodes = list(tree.items())[0]
        except Exception:
            raise indexer.QueryError()

        try:
            op = cls.multiple_operators[operator]
        except KeyError:
            try:
                op = cls.binary_operators[operator]
            except KeyError:
                try:
                    op = cls.unary_operators[operator]
                except KeyError:
                    raise indexer.QueryInvalidOperator(operator)
                return cls._handle_unary_op(engine, op, nodes)
            return cls._handle_binary_op(engine, table, op, nodes)
        return cls._handle_multiple_op(engine, table, op, nodes)
项目:masakari    作者:openstack    | 项目源码 | 文件源码
def test_innodb_tables(self):
        sa_migration.db_sync(engine=self.migrate_engine)

        total = self.migrate_engine.execute(
            "SELECT count(*) "
            "FROM information_schema.TABLES "
            "WHERE TABLE_SCHEMA = '%(database)s'" %
            {'database': self.migrate_engine.url.database})
        self.assertGreater(total.scalar(), 0, "No tables found. Wrong schema?")

        noninnodb = self.migrate_engine.execute(
            "SELECT count(*) "
            "FROM information_schema.TABLES "
            "WHERE TABLE_SCHEMA='%(database)s' "
            "AND ENGINE != 'InnoDB' "
            "AND TABLE_NAME != 'migrate_version'" %
            {'database': self.migrate_engine.url.database})
        count = noninnodb.scalar()
        self.assertEqual(count, 0, "%d non InnoDB tables created" % count)
项目:catwalk    作者:dssg    | 项目源码 | 文件源码
def retrieve_model_id_from_hash(db_engine, model_hash):
    """Retrieves a model id from the database that matches the given hash

    Args:
        db_engine (sqlalchemy.engine) A database engine
        model_hash (str) The model hash to lookup

    Returns: (int) The model id (if found in DB), None (if not)
    """
    session = sessionmaker(bind=db_engine)()
    try:
        saved = session.query(Model)\
            .filter_by(model_hash=model_hash)\
            .one_or_none()
        return saved.model_id if saved else None
    finally:
        session.close()
项目:catwalk    作者:dssg    | 项目源码 | 文件源码
def save_db_objects(db_engine, db_objects):
    """Saves a collection of SQLAlchemy model objects to the database using a COPY command

    Args:
        db_engine (sqlalchemy.engine)
        db_objects (list) SQLAlchemy model objects, corresponding to a valid table
    """
    with tempfile.TemporaryFile(mode='w+') as f:
        writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL)
        for db_object in db_objects:
            writer.writerow([
                getattr(db_object, col.name)
                for col in db_object.__table__.columns
            ])
        f.seek(0)
        postgres_copy.copy_from(f, type(db_objects[0]), db_engine, format='csv')
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def _check_002(self, engine, data):
        for column in ['created_at', 'updated_at', 'id', 'instance_uuid',
                'cell_id', 'project_id']:
            self.assertColumnExists(engine, 'instance_mappings', column)

        for index in ['instance_uuid_idx', 'project_id_idx']:
            self.assertIndexExists(engine, 'instance_mappings', index)

        self.assertUniqueConstraintExists(engine, 'instance_mappings',
                ['instance_uuid'])

        inspector = reflection.Inspector.from_engine(engine)
        # There should only be one foreign key here
        fk = inspector.get_foreign_keys('instance_mappings')[0]
        self.assertEqual('cell_mappings', fk['referred_table'])
        self.assertEqual(['id'], fk['referred_columns'])
        self.assertEqual(['cell_id'], fk['constrained_columns'])
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def _check_006(self, engine, data):
        for column in ['id', 'request_spec_id', 'project_id', 'user_id',
                'display_name', 'instance_metadata', 'progress', 'vm_state',
                'image_ref', 'access_ip_v4', 'access_ip_v6', 'info_cache',
                'security_groups', 'config_drive', 'key_name', 'locked_by']:
            self.assertColumnExists(engine, 'build_requests', column)

        self.assertIndexExists(engine, 'build_requests',
            'build_requests_project_id_idx')
        self.assertUniqueConstraintExists(engine, 'build_requests',
                ['request_spec_id'])

        inspector = reflection.Inspector.from_engine(engine)
        # There should only be one foreign key here
        fk = inspector.get_foreign_keys('build_requests')[0]
        self.assertEqual('request_specs', fk['referred_table'])
        self.assertEqual(['id'], fk['referred_columns'])
        self.assertEqual(['request_spec_id'], fk['constrained_columns'])
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def _check_275(self, engine, data):
        self.assertColumnExists(engine, 'key_pairs', 'type')
        self.assertColumnExists(engine, 'shadow_key_pairs', 'type')

        key_pairs = oslodbutils.get_table(engine, 'key_pairs')
        shadow_key_pairs = oslodbutils.get_table(engine, 'shadow_key_pairs')
        self.assertIsInstance(key_pairs.c.type.type,
                              sqlalchemy.types.String)
        self.assertIsInstance(shadow_key_pairs.c.type.type,
                              sqlalchemy.types.String)

        # Make sure the keypair entry will have the type 'ssh'
        key_pairs = oslodbutils.get_table(engine, 'key_pairs')
        keypair = key_pairs.select(
            key_pairs.c.name == 'test-migr').execute().first()
        self.assertEqual('ssh', keypair.type)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def _check_313(self, engine, data):

        self.assertColumnExists(engine, 'pci_devices', 'parent_addr')
        self.assertColumnExists(engine, 'shadow_pci_devices', 'parent_addr')
        pci_devices = oslodbutils.get_table(engine, 'pci_devices')
        shadow_pci_devices = oslodbutils.get_table(
            engine, 'shadow_pci_devices')
        self.assertIsInstance(pci_devices.c.parent_addr.type,
                              sqlalchemy.types.String)
        self.assertTrue(pci_devices.c.parent_addr.nullable)
        self.assertIsInstance(shadow_pci_devices.c.parent_addr.type,
                              sqlalchemy.types.String)
        self.assertTrue(shadow_pci_devices.c.parent_addr.nullable)
        self.assertIndexMembers(engine, 'pci_devices',
                        'ix_pci_devices_compute_node_id_parent_addr_deleted',
                        ['compute_node_id', 'parent_addr', 'deleted'])
项目:annotated-py-sqlalchemy    作者:hhstore    | 项目源码 | 文件源码
def convert_bind_param(self, value, engine):
        # TODO: perform appropriate postgres1 conversion between Python DateTime/MXDateTime
        # this one doesnt seem to work with the "emulation" mode
        return psycopg.TimestampFromMx(value)
项目:annotated-py-sqlalchemy    作者:hhstore    | 项目源码 | 文件源码
def convert_result_value(self, value, engine):
        # TODO: perform appropriate postgres1 conversion between Python DateTime/MXDateTime
        return value
项目:annotated-py-sqlalchemy    作者:hhstore    | 项目源码 | 文件源码
def convert_bind_param(self, value, engine):
        # TODO: perform appropriate postgres1 conversion between Python DateTime/MXDateTime
        # this one doesnt seem to work with the "emulation" mode
        return psycopg.DateFromMx(value)
项目:annotated-py-sqlalchemy    作者:hhstore    | 项目源码 | 文件源码
def convert_result_value(self, value, engine):
        # TODO: perform appropriate postgres1 conversion between Python DateTime/MXDateTime
        return value
项目:annotated-py-sqlalchemy    作者:hhstore    | 项目源码 | 文件源码
def convert_bind_param(self, value, engine):
        # TODO: perform appropriate postgres1 conversion between Python DateTime/MXDateTime
        # this one doesnt seem to work with the "emulation" mode
        return psycopg.TimeFromMx(value)
项目:annotated-py-sqlalchemy    作者:hhstore    | 项目源码 | 文件源码
def engine(opts, **params):
    return PGSQLEngine(opts, **params)
项目:annotated-py-sqlalchemy    作者:hhstore    | 项目源码 | 文件源码
def engine(opts, **params):
    return MySQLEngine(opts, **params)
项目:annotated-py-sqlalchemy    作者:hhstore    | 项目源码 | 文件源码
def _cvt(self, value, engine, fmt):
        if value is None:
            return None
        parts = value.split('.')
        try:
            (value, microsecond) = value.split('.')
            microsecond = int(microsecond)
        except ValueError:
            (value, microsecond) = (value, 0)
        return time.strptime(value, fmt)[0:6] + (microsecond,)
项目:annotated-py-sqlalchemy    作者:hhstore    | 项目源码 | 文件源码
def convert_result_value(self, value, engine):
        tup = self._cvt(value, engine, "%Y-%m-%d %H:%M:%S")
        return tup and datetime.datetime(*tup)
项目:annotated-py-sqlalchemy    作者:hhstore    | 项目源码 | 文件源码
def convert_result_value(self, value, engine):
        tup = self._cvt(value, engine, "%Y-%m-%d")
        return tup and datetime.date(*tup[0:3])
项目:annotated-py-sqlalchemy    作者:hhstore    | 项目源码 | 文件源码
def engine(opts, **params):
    return SQLiteSQLEngine(opts, **params)
项目:annotated-py-sqlalchemy    作者:hhstore    | 项目源码 | 文件源码
def select_text(self, text, **params):
        t = sql.text(text, engine=self.primarytable.engine)
        return self.instances(t.execute(**params))
项目:annotated-py-sqlalchemy    作者:hhstore    | 项目源码 | 文件源码
def engine(**params):
    return ANSISQLEngine(**params)
项目:annotated-py-sqlalchemy    作者:hhstore    | 项目源码 | 文件源码
def get_params(self, **params):
        """returns a structure of bind parameters for this compiled object.
        This includes bind parameters that might be compiled in via the "values"
        argument of an Insert or Update statement object, and also the given **params.
        The keys inside of **params can be any key that matches the BindParameterClause
        objects compiled within this object.  The output is dependent on the paramstyle
        of the DBAPI being used; if a named style, the return result will be a dictionary
        with keynames matching the compiled statement.  If a positional style, the output
        will be a list corresponding to the bind positions in the compiled statement.

        for an executemany style of call, this method should be called for each element
        in the list of parameter groups that will ultimately be executed.
        """
        if self.parameters is not None:
            bindparams = self.parameters.copy()
        else:
            bindparams = {}
        bindparams.update(params)

        if self.engine.positional:
            d = OrderedDict()
            for k in self.positiontup:
                b = self.binds[k]
                d[k] = b.typeprocess(b.value, self.engine)
        else:
            d = {}
            for b in self.binds.values():
                d[b.key] = b.typeprocess(b.value, self.engine)

        for key, value in bindparams.iteritems():
            try:
                b = self.binds[key]
            except KeyError:
                continue
            d[b.key] = b.typeprocess(value, self.engine)

        return d
        if self.engine.positional:
            return d.values()
        else:
            return d
项目:annotated-py-sqlalchemy    作者:hhstore    | 项目源码 | 文件源码
def get_named_params(self, parameters):
        """given the results of the get_params method, returns the parameters
        in dictionary format.  For a named paramstyle, this just returns the
        same dictionary.  For a positional paramstyle, the given parameters are
        assumed to be in list format and are converted back to a dictionary.
        """
#        return parameters
        if self.engine.positional:
            p = {}
            for i in range(0, len(self.positiontup)):
                p[self.positiontup[i]] = parameters[i]
            return p
        else:
            return parameters
项目:annotated-py-sqlalchemy    作者:hhstore    | 项目源码 | 文件源码
def bindparam_string(self, name):
        return self.engine.bindtemplate % name
项目:annotated-py-sqlalchemy    作者:hhstore    | 项目源码 | 文件源码
def get_column_default_string(self, column):
        if isinstance(column.default, schema.PassiveDefault):
            if isinstance(column.default.arg, str):
                return repr(column.default.arg)
            else:
                return str(column.default.arg.compile(self.engine))
        else:
            return None
项目:pyuniprot    作者:cebel    | 项目源码 | 文件源码
def __init__(self, connection=None, echo=False):
        """
        :param str connection: SQLAlchemy 
        :param bool echo: True or False for SQL output of SQLAlchemy engine
        """
        log.setLevel(logging.INFO)

        handler = logging.FileHandler(os.path.join(PYUNIPROT_DIR, defaults.TABLE_PREFIX + 'database.log'))
        handler.setLevel(logging.INFO)

        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        log.addHandler(handler)

        try:
            self.connection = get_connection_string(connection)
            self.engine = sqlalchemy.create_engine(self.connection, echo=echo)

            self.inspector = reflection.Inspector.from_engine(self.engine)
            self.sessionmaker = sessionmaker(
                bind=self.engine,
                autoflush=False,
                autocommit=False,
                expire_on_commit=True
            )
            self.session = scoped_session(self.sessionmaker)
        except:
            log.warning('No valid database connection. Execute `pyuniprot connection` on command line')
项目:pyuniprot    作者:cebel    | 项目源码 | 文件源码
def _create_tables(self, checkfirst=True):
        """creates all tables from models in your database

        :param checkfirst: True or False check if tables already exists
        :type checkfirst: bool
        :return: 
        """
        log.info('create tables in {}'.format(self.engine.url))
        models.Base.metadata.create_all(self.engine, checkfirst=checkfirst)
项目:pyuniprot    作者:cebel    | 项目源码 | 文件源码
def _drop_tables(self):
        """drops all tables in the database"""
        log.info('drop tables in {}'.format(self.engine.url))
        self.session.commit()
        models.Base.metadata.drop_all(self.engine)
        self.session.commit()
项目:PyAthena    作者:laughingman7743    | 项目源码 | 文件源码
def test_basic_query(self, engine, conn):
        rows = conn.execute('SELECT * FROM one_row').fetchall()
        self.assertEqual(len(rows), 1)
        self.assertEqual(rows[0].number_of_rows, 1)
        self.assertEqual(len(rows[0]), 1)
项目:PyAthena    作者:laughingman7743    | 项目源码 | 文件源码
def test_reflect_no_such_table(self, engine, conn):
        self.assertRaises(
            NoSuchTableError,
            lambda: Table('this_does_not_exist', MetaData(bind=engine), autoload=True))
        self.assertRaises(
            NoSuchTableError,
            lambda: Table('this_does_not_exist', MetaData(bind=engine),
                          schema='also_does_not_exist', autoload=True))
项目:PyAthena    作者:laughingman7743    | 项目源码 | 文件源码
def test_reflect_table(self, engine, connection):
        one_row = Table('one_row', MetaData(bind=engine), autoload=True)
        self.assertEqual(len(one_row.c), 1)
        self.assertIsNotNone(one_row.c.number_of_rows)
项目:PyAthena    作者:laughingman7743    | 项目源码 | 文件源码
def test_reflect_table_with_schema(self, engine, connection):
        one_row = Table('one_row', MetaData(bind=engine),
                        schema=SCHEMA, autoload=True)
        self.assertEqual(len(one_row.c), 1)
        self.assertIsNotNone(one_row.c.number_of_rows)
项目:PyAthena    作者:laughingman7743    | 项目源码 | 文件源码
def test_reflect_table_include_columns(self, engine, connection):
        one_row_complex = Table('one_row_complex', MetaData(bind=engine))
        engine.dialect.reflecttable(
            connection, one_row_complex, include_columns=['col_int'], exclude_columns=[])
        self.assertEqual(len(one_row_complex.c), 1)
        self.assertIsNotNone(one_row_complex.c.col_int)
        self.assertRaises(AttributeError, lambda: one_row_complex.c.col_tinyint)
项目:PyAthena    作者:laughingman7743    | 项目源码 | 文件源码
def test_reflect_schemas(self, engine, connection):
        insp = sqlalchemy.inspect(engine)
        schemas = insp.get_schema_names()
        self.assertIn(SCHEMA, schemas)
        self.assertIn('default', schemas)
项目:PyAthena    作者:laughingman7743    | 项目源码 | 文件源码
def test_get_table_names(self, engine, connection):
        meta = MetaData()
        meta.reflect(bind=engine)
        print(meta.tables)
        self.assertIn('one_row', meta.tables)
        self.assertIn('one_row_complex', meta.tables)

        insp = sqlalchemy.inspect(engine)
        self.assertIn(
            'many_rows',
            insp.get_table_names(schema=SCHEMA),
        )
项目:PyAthena    作者:laughingman7743    | 项目源码 | 文件源码
def test_has_table(self, engine, connection):
        self.assertTrue(Table('one_row', MetaData(bind=engine)).exists())
        self.assertFalse(Table('this_table_does_not_exist', MetaData(bind=engine)).exists())
项目:PyAthena    作者:laughingman7743    | 项目源码 | 文件源码
def test_char_length(self, engine, connection):
        one_row_complex = Table('one_row_complex', MetaData(bind=engine), autoload=True)
        result = sqlalchemy.select([
            sqlalchemy.func.char_length(one_row_complex.c.col_string)
        ]).execute().scalar()
        self.assertEqual(result, len('a string'))
项目:PyAthena    作者:laughingman7743    | 项目源码 | 文件源码
def test_reflect_select(self, engine, connection):
        one_row_complex = Table('one_row_complex', MetaData(bind=engine), autoload=True)
        self.assertEqual(len(one_row_complex.c), 15)
        self.assertIsInstance(one_row_complex.c.col_string, Column)
        rows = one_row_complex.select().execute().fetchall()
        self.assertEqual(len(rows), 1)
        self.assertEqual(list(rows[0]), [
            True,
            127,
            32767,
            2147483647,
            9223372036854775807,
            0.5,
            0.25,
            'a string',
            datetime(2017, 1, 1, 0, 0, 0),
            date(2017, 1, 2),
            b'123',
            '[1, 2]',
            '{1=2, 3=4}',
            '{a=1, b=2}',
            Decimal('0.1'),
        ])
        self.assertIsInstance(one_row_complex.c.col_boolean.type, BOOLEAN)
        self.assertIsInstance(one_row_complex.c.col_tinyint.type, INTEGER)
        self.assertIsInstance(one_row_complex.c.col_smallint.type, INTEGER)
        self.assertIsInstance(one_row_complex.c.col_int.type, INTEGER)
        self.assertIsInstance(one_row_complex.c.col_bigint.type, BIGINT)
        self.assertIsInstance(one_row_complex.c.col_float.type, FLOAT)
        self.assertIsInstance(one_row_complex.c.col_double.type, FLOAT)
        self.assertIsInstance(one_row_complex.c.col_string.type, type(STRINGTYPE))
        self.assertIsInstance(one_row_complex.c.col_timestamp.type, TIMESTAMP)
        self.assertIsInstance(one_row_complex.c.col_date.type, DATE)
        self.assertIsInstance(one_row_complex.c.col_binary.type, BINARY)
        self.assertIsInstance(one_row_complex.c.col_array.type, type(STRINGTYPE))
        self.assertIsInstance(one_row_complex.c.col_map.type, type(STRINGTYPE))
        self.assertIsInstance(one_row_complex.c.col_struct.type, type(STRINGTYPE))
        self.assertIsInstance(one_row_complex.c.col_decimal.type, DECIMAL)
项目:dd-trace-py    作者:DataDog    | 项目源码 | 文件源码
def patch():
    if getattr(sqlalchemy.engine, '__datadog_patch', False):
        return
    setattr(sqlalchemy.engine, '__datadog_patch', True)

    # patch the engine creation function
    _w('sqlalchemy', 'create_engine', _wrap_create_engine)
    _w('sqlalchemy.engine', 'create_engine', _wrap_create_engine)
项目:dd-trace-py    作者:DataDog    | 项目源码 | 文件源码
def unpatch():
    # unpatch sqlalchemy
    if getattr(sqlalchemy.engine, '__datadog_patch', False):
        setattr(sqlalchemy.engine, '__datadog_patch', False)
        unwrap(sqlalchemy, 'create_engine')
        unwrap(sqlalchemy.engine, 'create_engine')
项目:unsonic    作者:redshodan    | 项目源码 | 文件源码
def init(settings, webapp=False, db_info=None):
    global db_url, db_engine, session_maker

    if db_info:
        db_url = db_info.url
    else:
        db_url = _dbUrl(web.CONFIG)

    settings["sqlalchemy.url"] = db_url
    web.CONFIG.set("mishmash", "sqlalchemy.url", settings["sqlalchemy.url"])

    settings["sqlalchemy.convert_unicode"] = \
        web.CONFIG.get("mishmash", "sqlalchemy.convert_unicode")
    settings["sqlalchemy.encoding"] = web.CONFIG.get("mishmash",
                                                     "sqlalchemy.encoding")

    if not db_info:
        config = Namespace()
        config.db_url = db_url
        config.various_artists_name = web.CONFIG.get("mishmash",
                                                     "various_artists_name")
        db_info = dbinit(config.db_url)
    db_engine = db_info.engine
    session_maker = db_info.SessionMaker

    initAlembic(db_url)
项目:PyAthenaJDBC    作者:laughingman7743    | 项目源码 | 文件源码
def test_basic_query(self, engine, conn):
        rows = conn.execute('SELECT * FROM one_row').fetchall()
        self.assertEqual(len(rows), 1)
        self.assertEqual(rows[0].number_of_rows, 1)
        self.assertEqual(len(rows[0]), 1)
项目:PyAthenaJDBC    作者:laughingman7743    | 项目源码 | 文件源码
def test_reflect_no_such_table(self, engine, conn):
        self.assertRaises(
            NoSuchTableError,
            lambda: Table('this_does_not_exist', MetaData(bind=engine), autoload=True))
        self.assertRaises(
            NoSuchTableError,
            lambda: Table('this_does_not_exist', MetaData(bind=engine),
                          schema='also_does_not_exist', autoload=True))
项目:PyAthenaJDBC    作者:laughingman7743    | 项目源码 | 文件源码
def test_reflect_table(self, engine, connection):
        one_row = Table('one_row', MetaData(bind=engine), autoload=True)
        self.assertEqual(len(one_row.c), 1)
        self.assertIsNotNone(one_row.c.number_of_rows)
项目:PyAthenaJDBC    作者:laughingman7743    | 项目源码 | 文件源码
def test_reflect_table_with_schema(self, engine, connection):
        one_row = Table('one_row', MetaData(bind=engine),
                        schema=SCHEMA, autoload=True)
        self.assertEqual(len(one_row.c), 1)
        self.assertIsNotNone(one_row.c.number_of_rows)
项目:PyAthenaJDBC    作者:laughingman7743    | 项目源码 | 文件源码
def test_reflect_table_include_columns(self, engine, connection):
        one_row_complex = Table('one_row_complex', MetaData(bind=engine))
        engine.dialect.reflecttable(
            connection, one_row_complex, include_columns=['col_int'], exclude_columns=[])
        self.assertEqual(len(one_row_complex.c), 1)
        self.assertIsNotNone(one_row_complex.c.col_int)
        self.assertRaises(AttributeError, lambda: one_row_complex.c.col_tinyint)