Python sqlalchemy 模块,MetaData() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.MetaData()

项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def __init__(self, engine):

        self.engine = engine
        metadata = sa.MetaData(bind=engine)
        metadata.reflect(only=asset_db_table_names)
        for table_name in asset_db_table_names:
            setattr(self, table_name, metadata.tables[table_name])

        # Check the version info of the db for compatibility
        check_version_info(self.version_info, ASSET_DB_VERSION)

        # Cache for lookup of assets by sid, the objects in the asset lookup
        # may be shared with the results from equity and future lookup caches.
        #
        # The top level cache exists to minimize lookups on the asset type
        # routing.
        #
        # The caches are read through, i.e. accessing an asset through
        # retrieve_asset will populate the cache on first retrieval.
        self._caches = (self._asset_cache, self._asset_type_cache) = {}, {}

        # Populated on first call to `lifetimes`.
        self._asset_lifetimes = None
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def test_write_version(self):
        env = TradingEnvironment(load=noop_load)
        metadata = sa.MetaData(bind=env.engine)
        version_table = _version_table_schema(metadata)
        version_table.delete().execute()

        # Assert that the version is not present in the table
        self.assertIsNone(sa.select((version_table.c.version,)).scalar())

        # This should fail because the table has no version info and is,
        # therefore, consdered v0
        with self.assertRaises(AssetDBVersionError):
            check_version_info(version_table, -2)

        # This should not raise an error because the version has been written
        write_version_info(version_table, -2)
        check_version_info(version_table, -2)

        # Assert that the version is in the table and correct
        self.assertEqual(sa.select((version_table.c.version,)).scalar(), -2)

        # Assert that trying to overwrite the version fails
        with self.assertRaises(sa.exc.IntegrityError):
            write_version_info(version_table, -3)
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def test_finder_checks_version(self):
        # Create an env and give it a bogus version number
        env = TradingEnvironment(load=noop_load)
        metadata = sa.MetaData(bind=env.engine)
        version_table = _version_table_schema(metadata)
        version_table.delete().execute()
        write_version_info(version_table, -2)
        check_version_info(version_table, -2)

        # Assert that trying to build a finder with a bad db raises an error
        with self.assertRaises(AssetDBVersionError):
            AssetFinder(engine=env.engine)

        # Change the version number of the db to the correct version
        version_table.delete().execute()
        write_version_info(version_table, ASSET_DB_VERSION)
        check_version_info(version_table, ASSET_DB_VERSION)

        # Now that the versions match, this Finder should succeed
        AssetFinder(engine=env.engine)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def bind(self):
        """Return the current "bind".

        In online mode, this is an instance of
        :class:`sqlalchemy.engine.Connection`, and is suitable
        for ad-hoc execution of any kind of usage described
        in :ref:`sqlexpression_toplevel` as well as
        for usage with the :meth:`sqlalchemy.schema.Table.create`
        and :meth:`sqlalchemy.schema.MetaData.create_all` methods
        of :class:`~sqlalchemy.schema.Table`,
        :class:`~sqlalchemy.schema.MetaData`.

        Note that when "standard output" mode is enabled,
        this bind will be a "mock" connection handler that cannot
        return results and is only appropriate for a very limited
        subset of commands.

        """
        return self.connection
项目:tuning-box    作者:openstack    | 项目源码 | 文件源码
def _get_autobase(table_prefix, bind):
    metadata = sa.MetaData(bind=bind)
    table_name = table_prefix + 'environment_hierarchy_level_value'
    metadata.reflect(only=[table_name])
    AutoBase = automap.automap_base(metadata=metadata)

    def classname_for_table(base, refl_table_name, table):
        assert refl_table_name.startswith(table_prefix)
        noprefix_name = refl_table_name[len(table_prefix):]
        uname = u"".join(s.capitalize() for s in noprefix_name.split('_'))
        if not isinstance(uname, str):
            return uname.encode('utf-8')
        else:
            return uname

    AutoBase.prepare(classname_for_table=classname_for_table)
    return AutoBase
项目:bit    作者:codesmart-co    | 项目源码 | 文件源码
def get_columns_from_etl_table(self):
        try:
            extra = {}
            meta = MetaData(**extra.get('metadata_params', {}))
            table = Table(
                self.sql_table_name, meta,
                schema=self.schema or None,
                autoload=True,
                autoload_with=self.local_engine)

        except Exception:

            raise Exception(
                "Table doesn't seem to exist in the specified database, "
                "couldn't fetch column information")

        return len(table.columns)
项目:ibstract    作者:jesseliu0    | 项目源码 | 文件源码
def _gen_sa_table(sectype, metadata=None):
    """Generate SQLAlchemy Table object by sectype.
    """
    if metadata is None:
        metadata = MetaData()
    table = Table(
        sectype, metadata,
        Column('Symbol', String(20), primary_key=True),
        Column('DataType', String(20), primary_key=True),
        Column('BarSize', String(10), primary_key=True),
        Column('TickerTime', DateTime(), primary_key=True),
        Column('opening', Float(10, 2)),
        Column('high', Float(10, 2)),
        Column('low', Float(10, 2)),
        Column('closing', Float(10, 2)),
        Column('volume', mysqlINTEGER(unsigned=True)),
        Column('barcount', mysqlINTEGER(unsigned=True)),
        Column('average', Float(10, 2))
    )
    return table
项目:weibo    作者:windskyer    | 项目源码 | 文件源码
def db_version():
    repository = _find_migrate_repo()
    try:
        return versioning_api.db_version(get_engine(), repository)
    except versioning_exceptions.DatabaseNotControlledError:
        meta = sqlalchemy.MetaData()
        engine = get_engine()
        meta.reflect(bind=engine)
        tables = meta.tables
        if len(tables) == 0:
            db_version_control(version.INIT_VERSION)
            return versioning_api.db_version(get_engine(), repository)
        else:
            # Some pre-Essex DB's may not be version controlled.
            # Require them to upgrade using Essex first.
            raise exception.WeiboException(
                _("Upgrade DB using Essex release first."))
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def _init_class(cls):
        if cls.run_define_tables == 'each':
            if cls.run_create_tables == 'once':
                cls.run_create_tables = 'each'
            assert cls.run_inserts in ('each', None)

        if cls.other is None:
            cls.other = adict()

        if cls.tables is None:
            cls.tables = adict()

        if cls.bind is None:
            setattr(cls, 'bind', cls.setup_bind())

        if cls.metadata is None:
            setattr(cls, 'metadata', sa.MetaData())

        if cls.metadata.bind is None:
            cls.metadata.bind = cls.bind
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def bind(self):
        """Return the current "bind".

        In online mode, this is an instance of
        :class:`sqlalchemy.engine.Connection`, and is suitable
        for ad-hoc execution of any kind of usage described
        in :ref:`sqlexpression_toplevel` as well as
        for usage with the :meth:`sqlalchemy.schema.Table.create`
        and :meth:`sqlalchemy.schema.MetaData.create_all` methods
        of :class:`~sqlalchemy.schema.Table`, :class:`~sqlalchemy.schema.MetaData`.

        Note that when "standard output" mode is enabled,
        this bind will be a "mock" connection handler that cannot
        return results and is only appropriate for a very limited
        subset of commands.

        """
        return self.connection
项目:flask-boilerplate    作者:MarcFord    | 项目源码 | 文件源码
def __init__(self):
        super(AlchemyBase, self).__init__()

        def fk_fixed_width(constraint, table):
            str_tokens = [table.name] +\
                         [element.parent.name for element in constraint.elements] +\
                         [element.target_fullname for element in constraint.elements]
            guid = uuid.uuid5(uuid.NAMESPACE_OID, "_".join(str_tokens).encode('ascii'))
            return str(guid)

        convention = {
            "fk_fixed_width": fk_fixed_width,
            "ix": 'ix_%(column_0_label)s',
            "uq": "uq_%(table_name)s_%(column_0_name)s",
            "ck": "ck_%(table_name)s_%(column_0_name)s",
            "fk": "fk_%(fk_fixed_width)s",
            "pk": "pk_%(table_name)s"
        }
        metadata = MetaData(naming_convention=convention)
        self.Model = declarative_base(metadata=metadata, cls=Model, name='Model', metaclass=_BoundDeclarativeMeta)
        self.Model.query = _QueryProperty(self)
项目:ozelot    作者:trycs    | 项目源码 | 文件源码
def test03(self):
        """Create two tables, drop one, check the other is still there
        """
        MyModelA().create_table(self.client)
        MyModelB().create_table(self.client)

        # now there are two tables now
        metadata = sa.MetaData()
        metadata.reflect(self.client.get_engine())
        self.assertEqual(len(metadata.tables), 2)

        MyModelA().drop_table(self.client)

        # one table left
        metadata = sa.MetaData()
        metadata.reflect(self.client.get_engine())
        self.assertEqual(len(metadata.tables), 1)
        self.assertEqual(list(metadata.tables.keys())[0], 'mymodelb')
项目:ozelot    作者:trycs    | 项目源码 | 文件源码
def test04(self):
        """Create a table and populate it with some objects
        """
        MyModelB().create_table(self.client)

        # create some objects
        session = self.client.create_session()
        session.add(MyModelB(my_other_field=17))
        session.add(MyModelB(my_other_field=18))
        session.add(MyModelB(my_other_field=19))
        session.commit()

        metadata = sa.MetaData()
        metadata.reflect(self.client.get_engine())
        query = select([func.count()]).select_from(metadata.tables['mymodelb'])
        count = self.client.get_engine().execute(query).scalar()
        self.assertEqual(count, 3)

        session.close()
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def _init_class(cls):
        if cls.run_define_tables == 'each':
            if cls.run_create_tables == 'once':
                cls.run_create_tables = 'each'
            assert cls.run_inserts in ('each', None)

        if cls.other is None:
            cls.other = adict()

        if cls.tables is None:
            cls.tables = adict()

        if cls.bind is None:
            setattr(cls, 'bind', cls.setup_bind())

        if cls.metadata is None:
            setattr(cls, 'metadata', sa.MetaData())

        if cls.metadata.bind is None:
            cls.metadata.bind = cls.bind
项目:bilean    作者:openstack    | 项目源码 | 文件源码
def upgrade(migrate_engine):
    meta = sqlalchemy.MetaData()
    meta.bind = migrate_engine

    job = sqlalchemy.Table(
        'job', meta,
        sqlalchemy.Column('id', sqlalchemy.String(50),
                          primary_key=True, nullable=False),
        sqlalchemy.Column('scheduler_id', sqlalchemy.String(36),
                          nullable=False),
        sqlalchemy.Column('job_type', sqlalchemy.String(10),
                          nullable=False),
        sqlalchemy.Column('parameters', types.Dict),
        sqlalchemy.Column('created_at', sqlalchemy.DateTime),
        sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
        mysql_engine='InnoDB',
        mysql_charset='utf8'
    )

    try:
        job.create()
    except Exception:
        LOG.error("Table |%s| not created!", repr(job))
        raise
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def bind(self):
        """Return the current "bind".

        In online mode, this is an instance of
        :class:`sqlalchemy.engine.Connection`, and is suitable
        for ad-hoc execution of any kind of usage described
        in :ref:`sqlexpression_toplevel` as well as
        for usage with the :meth:`sqlalchemy.schema.Table.create`
        and :meth:`sqlalchemy.schema.MetaData.create_all` methods
        of :class:`~sqlalchemy.schema.Table`,
        :class:`~sqlalchemy.schema.MetaData`.

        Note that when "standard output" mode is enabled,
        this bind will be a "mock" connection handler that cannot
        return results and is only appropriate for a very limited
        subset of commands.

        """
        return self.connection
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def _init_class(cls):
        if cls.run_define_tables == 'each':
            if cls.run_create_tables == 'once':
                cls.run_create_tables = 'each'
            assert cls.run_inserts in ('each', None)

        if cls.other is None:
            cls.other = adict()

        if cls.tables is None:
            cls.tables = adict()

        if cls.bind is None:
            setattr(cls, 'bind', cls.setup_bind())

        if cls.metadata is None:
            setattr(cls, 'metadata', sa.MetaData())

        if cls.metadata.bind is None:
            cls.metadata.bind = cls.bind
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def bind(self):
        """Return the current "bind".

        In online mode, this is an instance of
        :class:`sqlalchemy.engine.Connection`, and is suitable
        for ad-hoc execution of any kind of usage described
        in :ref:`sqlexpression_toplevel` as well as
        for usage with the :meth:`sqlalchemy.schema.Table.create`
        and :meth:`sqlalchemy.schema.MetaData.create_all` methods
        of :class:`~sqlalchemy.schema.Table`, :class:`~sqlalchemy.schema.MetaData`.

        Note that when "standard output" mode is enabled,
        this bind will be a "mock" connection handler that cannot
        return results and is only appropriate for a very limited
        subset of commands.

        """
        return self.connection
项目:sqlacodegen    作者:agronholm    | 项目源码 | 文件源码
def test_fancy_coltypes(self):
        Table(
            'simple_items', self.metadata,
            Column('enum', ENUM('A', 'B', name='blah')),
            Column('bool', BOOLEAN),
            Column('number', NUMERIC(10, asdecimal=False)),
        )

        assert self.generate_code() == """\
# coding: utf-8
from sqlalchemy import Boolean, Column, Enum, MetaData, Numeric, Table

metadata = MetaData()


t_simple_items = Table(
    'simple_items', metadata,
    Column('enum', Enum('A', 'B', name='blah')),
    Column('bool', Boolean),
    Column('number', Numeric(10, asdecimal=False))
)
"""
项目:sqlacodegen    作者:agronholm    | 项目源码 | 文件源码
def test_boolean_detection(self):
        Table(
            'simple_items', self.metadata,
            Column('bool1', INTEGER),
            Column('bool2', SMALLINT),
            Column('bool3', TINYINT),
            CheckConstraint('simple_items.bool1 IN (0, 1)'),
            CheckConstraint('simple_items.bool2 IN (0, 1)'),
            CheckConstraint('simple_items.bool3 IN (0, 1)')
        )

        assert self.generate_code() == """\
# coding: utf-8
from sqlalchemy import Boolean, Column, MetaData, Table

metadata = MetaData()


t_simple_items = Table(
    'simple_items', metadata,
    Column('bool1', Boolean),
    Column('bool2', Boolean),
    Column('bool3', Boolean)
)
"""
项目:sqlacodegen    作者:agronholm    | 项目源码 | 文件源码
def test_enum_detection(self):
        Table(
            'simple_items', self.metadata,
            Column('enum', VARCHAR(255)),
            CheckConstraint(r"simple_items.enum IN ('A', '\'B', 'C')")
        )

        assert self.generate_code() == """\
# coding: utf-8
from sqlalchemy import Column, Enum, MetaData, Table

metadata = MetaData()


t_simple_items = Table(
    'simple_items', metadata,
    Column('enum', Enum('A', "\\\\'B", 'C'))
)
"""
项目:sqlacodegen    作者:agronholm    | 项目源码 | 文件源码
def test_column_adaptation(self):
        Table(
            'simple_items', self.metadata,
            Column('id', BIGINT),
            Column('length', DOUBLE_PRECISION)
        )

        assert self.generate_code() == """\
# coding: utf-8
from sqlalchemy import BigInteger, Column, Float, MetaData, Table

metadata = MetaData()


t_simple_items = Table(
    'simple_items', metadata,
    Column('id', BigInteger),
    Column('length', Float)
)
"""
项目:sqlacodegen    作者:agronholm    | 项目源码 | 文件源码
def test_constraints_table(self):
        Table(
            'simple_items', self.metadata,
            Column('id', INTEGER),
            Column('number', INTEGER),
            CheckConstraint('number > 0'),
            UniqueConstraint('id', 'number')
        )

        assert self.generate_code() == """\
# coding: utf-8
from sqlalchemy import CheckConstraint, Column, Integer, MetaData, Table, UniqueConstraint

metadata = MetaData()


t_simple_items = Table(
    'simple_items', metadata,
    Column('id', Integer),
    Column('number', Integer),
    CheckConstraint('number > 0'),
    UniqueConstraint('id', 'number')
)
"""
项目:sqlacodegen    作者:agronholm    | 项目源码 | 文件源码
def test_noindexes_table(self):
        simple_items = Table(
            'simple_items', self.metadata,
            Column('number', INTEGER),
            CheckConstraint('number > 2')
        )
        simple_items.indexes.add(Index('idx_number', simple_items.c.number))

        assert self.generate_code(noindexes=True) == """\
# coding: utf-8
from sqlalchemy import CheckConstraint, Column, Integer, MetaData, Table

metadata = MetaData()


t_simple_items = Table(
    'simple_items', metadata,
    Column('number', Integer),
    CheckConstraint('number > 2')
)
"""
项目:sqlacodegen    作者:agronholm    | 项目源码 | 文件源码
def test_no_classes(self):
        Table(
            'simple_items', self.metadata,
            Column('id', INTEGER, primary_key=True)
        )

        assert self.generate_code(noclasses=True) == """\
# coding: utf-8
from sqlalchemy import Column, Integer, MetaData, Table

metadata = MetaData()


t_simple_items = Table(
    'simple_items', metadata,
    Column('id', Integer, primary_key=True)
)
"""
项目:sqlacodegen    作者:agronholm    | 项目源码 | 文件源码
def test_schema_boolean(self):
        Table(
            'simple_items', self.metadata,
            Column('bool1', INTEGER),
            CheckConstraint('testschema.simple_items.bool1 IN (0, 1)'),
            schema='testschema'
        )

        assert self.generate_code() == """\
# coding: utf-8
from sqlalchemy import Boolean, Column, MetaData, Table

metadata = MetaData()


t_simple_items = Table(
    'simple_items', metadata,
    Column('bool1', Boolean),
    schema='testschema'
)
"""
项目:sqlacodegen    作者:agronholm    | 项目源码 | 文件源码
def test_foreign_key_options(self):
        Table(
            'simple_items', self.metadata,
            Column('name', VARCHAR, ForeignKey('simple_items.name', ondelete='CASCADE', onupdate='CASCADE',
                                               deferrable=True, initially='DEFERRED'))
        )

        assert self.generate_code() == """\
# coding: utf-8
from sqlalchemy import Column, ForeignKey, MetaData, String, Table

metadata = MetaData()


t_simple_items = Table(
    'simple_items', metadata,
    Column('name', String, ForeignKey('simple_items.name', ondelete='CASCADE', onupdate='CASCADE', \
deferrable=True, initially='DEFERRED'))
)
"""
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def bind(self):
        """Return the current "bind".

        In online mode, this is an instance of
        :class:`sqlalchemy.engine.Connection`, and is suitable
        for ad-hoc execution of any kind of usage described
        in :ref:`sqlexpression_toplevel` as well as
        for usage with the :meth:`sqlalchemy.schema.Table.create`
        and :meth:`sqlalchemy.schema.MetaData.create_all` methods
        of :class:`~sqlalchemy.schema.Table`,
        :class:`~sqlalchemy.schema.MetaData`.

        Note that when "standard output" mode is enabled,
        this bind will be a "mock" connection handler that cannot
        return results and is only appropriate for a very limited
        subset of commands.

        """
        return self.connection
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def _set_table(self, table):
        if isinstance(table, basestring):
            if self.alter_metadata:
                if not self.meta:
                    raise ValueError("metadata must be specified for table"
                        " reflection when using alter_metadata")
                meta = self.meta
                if self.engine:
                    meta.bind = self.engine
            else:
                if not self.engine and not self.meta:
                    raise ValueError("engine or metadata must be specified"
                        " to reflect tables")
                if not self.engine:
                    self.engine = self.meta.bind
                meta = sqlalchemy.MetaData(bind=self.engine)
            self._table = sqlalchemy.Table(table, meta, autoload=True)
        elif isinstance(table, sqlalchemy.Table):
            self._table = table
            if not self.alter_metadata:
                self._table.meta = sqlalchemy.MetaData(bind=self._table.bind)
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def getDiffOfModelAgainstDatabase(metadata, engine, excludeTables=None):
    """
    Return differences of model against database.

    :return: object which will evaluate to :keyword:`True` if there \
      are differences else :keyword:`False`.
    """
    db_metadata = sqlalchemy.MetaData(engine, reflect=True)

    # sqlite will include a dynamically generated 'sqlite_sequence' table if
    # there are autoincrement sequences in the database; this should not be
    # compared.
    if engine.dialect.name == 'sqlite':
        if 'sqlite_sequence' in db_metadata.tables:
            db_metadata.remove(db_metadata.tables['sqlite_sequence'])

    return SchemaDiff(metadata, db_metadata,
                      labelA='model',
                      labelB='database',
                      excludeTables=excludeTables)
项目:quark    作者:openstack    | 项目源码 | 文件源码
def setUp(self):
        super(Test4fc07b41d45c, self).setUp()
        self.previous_revision = "42a3c8c0db75"
        self.current_revision = "4fc07b41d45c"
        self.metadata = sa.MetaData(bind=self.engine)
        # NOTE(thomasem): Create a quark_ip_addresses table that has an
        # identical schema as the revision before it for the columns this data
        # migration is concerned with.
        self.ip_addresses_table = sa.Table(
            'quark_ip_addresses', self.metadata,
            sa.Column('id', sa.String(length=36), primary_key=True),
            sa.Column('_deallocated', sa.Boolean()),
            sa.Column('address_type', sa.Enum('fixed', 'shared', 'floating'))
        )
        self.metadata.create_all()
        alembic_command.stamp(self.config, self.previous_revision)
项目:Callandtext    作者:iaora    | 项目源码 | 文件源码
def bind(self):
        """Return the current "bind".

        In online mode, this is an instance of
        :class:`sqlalchemy.engine.Connection`, and is suitable
        for ad-hoc execution of any kind of usage described
        in :ref:`sqlexpression_toplevel` as well as
        for usage with the :meth:`sqlalchemy.schema.Table.create`
        and :meth:`sqlalchemy.schema.MetaData.create_all` methods
        of :class:`~sqlalchemy.schema.Table`,
        :class:`~sqlalchemy.schema.MetaData`.

        Note that when "standard output" mode is enabled,
        this bind will be a "mock" connection handler that cannot
        return results and is only appropriate for a very limited
        subset of commands.

        """
        return self.connection
项目:Comparative-Annotation-Toolkit    作者:ComparativeGenomicsToolkit    | 项目源码 | 文件源码
def reflect_hints_db(db_path):
    """
    Reflect the database schema of the hints database, automapping the existing tables

    The NullPool is used to avoid concurrency issues with luigi. Using this activates pooling, but since sqlite doesn't
    really support pooling, what effectively happens is just that it locks the database and the other connections wait.

    :param db_path: path to hints sqlite database
    :return: sqlalchemy.MetaData object, sqlalchemy.orm.Session object
    """
    engine = sqlalchemy.create_engine('sqlite:///{}'.format(db_path), poolclass=NullPool)
    metadata = sqlalchemy.MetaData()
    metadata.reflect(bind=engine)
    Base = automap_base(metadata=metadata)
    Base.prepare()
    speciesnames = Base.classes.speciesnames
    seqnames = Base.classes.seqnames
    hints = Base.classes.hints
    featuretypes = Base.classes.featuretypes
    Session = sessionmaker(bind=engine)
    session = Session()
    return speciesnames, seqnames, hints, featuretypes, session
项目:aiohttp_admin    作者:aio-libs    | 项目源码 | 文件源码
def sa_table():
    choices = ['a', 'b', 'c']
    meta = sa.MetaData()
    post = sa.Table(
        'test_post', meta,
        sa.Column('id', sa.Integer, nullable=False),
        sa.Column('title', sa.String(200), nullable=False),
        sa.Column('category', sa.String(200), nullable=True),
        sa.Column('body', sa.Text, nullable=False),
        sa.Column('views', sa.Integer, nullable=False),
        sa.Column('average_note', sa.Float, nullable=False),
        # sa.Column('pictures', postgresql.JSON, server_default='{}'),
        sa.Column('published_at', sa.DateTime, nullable=False),
        # sa.Column('tags', postgresql.ARRAY(sa.Integer), server_default='{}'),
        sa.Column('status',
                  sa.Enum(*choices, name="enum_name", native_enum=False),
                  server_default="a", nullable=False),
        sa.Column('visible', sa.Boolean, nullable=False),

        # Indexes #
        sa.PrimaryKeyConstraint('id', name='post_id_pkey'))
    return post
项目:aiohttp_admin    作者:aio-libs    | 项目源码 | 文件源码
def table():

    meta = sa.MetaData()
    post = sa.Table(
        'post', meta,
        sa.Column('id', sa.Integer, nullable=False),
        sa.Column('title', sa.String(200), nullable=False),
        sa.Column('body', sa.Text, nullable=False),
        sa.Column('views', sa.Integer, nullable=False),
        sa.Column('average_note', sa.Float, nullable=False),
        sa.Column('pictures', postgresql.JSON, server_default='{}'),
        sa.Column('published_at', sa.Date, nullable=False),
        sa.Column('tags', postgresql.ARRAY(sa.Integer), server_default='[]'),

        # Indexes #
        sa.PrimaryKeyConstraint('id', name='post_id_pkey'))
    return post
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def test_dtype(self):
        cols = ['A', 'B']
        data = [(0.8, True),
                (0.9, None)]
        df = DataFrame(data, columns=cols)
        df.to_sql('dtype_test', self.conn)
        df.to_sql('dtype_test2', self.conn, dtype={'B': sqlalchemy.TEXT})
        meta = sqlalchemy.schema.MetaData(bind=self.conn)
        meta.reflect()
        sqltype = meta.tables['dtype_test2'].columns['B'].type
        self.assertTrue(isinstance(sqltype, sqlalchemy.TEXT))
        self.assertRaises(ValueError, df.to_sql,
                          'error', self.conn, dtype={'B': str})

        # GH9083
        df.to_sql('dtype_test3', self.conn, dtype={'B': sqlalchemy.String(10)})
        meta.reflect()
        sqltype = meta.tables['dtype_test3'].columns['B'].type
        self.assertTrue(isinstance(sqltype, sqlalchemy.String))
        self.assertEqual(sqltype.length, 10)
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def test_notnull_dtype(self):
        cols = {'Bool': Series([True, None]),
                'Date': Series([datetime(2012, 5, 1), None]),
                'Int': Series([1, None], dtype='object'),
                'Float': Series([1.1, None])
                }
        df = DataFrame(cols)

        tbl = 'notnull_dtype_test'
        df.to_sql(tbl, self.conn)
        returned_df = sql.read_sql_table(tbl, self.conn)  # noqa
        meta = sqlalchemy.schema.MetaData(bind=self.conn)
        meta.reflect()
        if self.flavor == 'mysql':
            my_type = sqltypes.Integer
        else:
            my_type = sqltypes.Boolean

        col_dict = meta.tables[tbl].columns

        self.assertTrue(isinstance(col_dict['Bool'].type, my_type))
        self.assertTrue(isinstance(col_dict['Date'].type, sqltypes.DateTime))
        self.assertTrue(isinstance(col_dict['Int'].type, sqltypes.Integer))
        self.assertTrue(isinstance(col_dict['Float'].type, sqltypes.Float))
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def generate_asset_db_metadata(bind=None):
    # NOTE: When modifying this schema, update the ASSET_DB_VERSION value
    metadata = sa.MetaData(bind=bind)
    _version_table_schema(metadata)
    _equities_table_schema(metadata)
    _futures_exchanges_schema(metadata)
    _futures_root_symbols_schema(metadata)
    _futures_contracts_schema(metadata)
    _asset_router_schema(metadata)
    return metadata


# A list of the names of all tables in the assets db
# NOTE: When modifying this schema, update the ASSET_DB_VERSION value
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_get_temp_table_columns(self):
        meta = MetaData(self.bind)
        user_tmp = self.tables.user_tmp
        insp = inspect(meta.bind)
        cols = insp.get_columns('user_tmp')
        self.assert_(len(cols) > 0, len(cols))

        for i, col in enumerate(user_tmp.columns):
            eq_(col.name, cols[i]['name'])
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_invocation(self):

        dbapi_session = ReplayableSession()
        creator = config.db.pool._creator
        recorder = lambda: dbapi_session.recorder(creator())
        engine = create_engine(
            config.db.url, creator=recorder,
            use_native_hstore=False)
        self.metadata = MetaData(engine)
        self.engine = engine
        self.session = Session(engine)

        self.setup_engine()
        try:
            self._run_steps(ctx=self._dummy_ctx)
        finally:
            self.teardown_engine()
            engine.dispose()

        player = lambda: dbapi_session.player()
        engine = create_engine(
            config.db.url, creator=player,
            use_native_hstore=False)

        self.metadata = MetaData(engine)
        self.engine = engine
        self.session = Session(engine)

        self.setup_engine()
        try:
            self._run_steps(ctx=profiling.count_functions)
        finally:
            self.session.close()
            engine.dispose()
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def get_metadata(bind):
    """Return the metadata for a bind."""
    if bind == '':
        bind = None
    m = MetaData()
    for t in target_metadata.tables.values():
        if t.info.get('bind_key') == bind:
            t.tometadata(m)
    return m
项目:fingerprint-securedrop    作者:freedomofpress    | 项目源码 | 文件源码
def __init__(self, **kwargs):
        """Read current structure from database"""
        super().__init__(**kwargs)

        # Generate mappings from existing tables
        metadata = MetaData(schema='raw')
        metadata.reflect(self.engine)
        Base = automap_base(metadata=metadata)
        Base.prepare()

        # Our fundamental objects are:
        self.Onion = Base.classes.hs_history
        self.Example = Base.classes.frontpage_examples
        self.Cell = Base.classes.frontpage_traces
        self.Crawl = Base.classes.crawls
项目:bit    作者:codesmart-co    | 项目源码 | 文件源码
def tmp_table(self):
        extra = {}
        meta = MetaData(**extra.get('metadata_params', {}))
        return Table(
            self.sql_table_name, meta,
            schema=self.schema or None,
            autoload=True,
            autoload_with=self.local_engine)
项目:ibstract    作者:jesseliu0    | 项目源码 | 文件源码
def init_db(db_info):
    db_conn = "mysql+pymysql://{0}:{1}@{2}/{3}".format(
        db_info['user'], db_info['password'], db_info['host'], db_info['db'])
    engine = create_engine(db_conn, echo=False)
    metadata = MetaData(engine, reflect=True)

    sec_type_list = ['Index', 'Stock', 'Option', 'Future', 'Commodity',
                     'FuturesOption', 'Forex', 'Bond', 'MutualFund',
                     'CFD', 'Warrant']
    for sectype in sec_type_list:
        if sectype not in metadata.tables.keys():
            table = _gen_sa_table(sectype, metadata=metadata)
            table.create(engine, checkfirst=True)
    engine.dispose()
项目:ibstract    作者:jesseliu0    | 项目源码 | 文件源码
def _clear_db(self):
        # delete all exsiting tables and create new tables
        engine = create_engine(self.db_conn, echo=False)
        metadata = MetaData(engine, reflect=True)
        sec_type_list = ['Index', 'Stock', 'Option', 'Future', 'Commodity',
                         'FuturesOption', 'Forex', 'Bond', 'MutualFund',
                         'CFD', 'Warrant']
        for sectype in sec_type_list:
            if sectype in metadata.tables.keys():
                metadata.tables[sectype].drop()
        engine.dispose()
项目:ibstract    作者:jesseliu0    | 项目源码 | 文件源码
def test_init_db(self):
        self._clear_db()
        init_db(self.db_info)
        engine = create_engine(self.db_conn, echo=False)
        metadata = MetaData(engine, reflect=True)
        sec_type_list = ['Index', 'Stock', 'Option', 'Future', 'Commodity',
                         'FuturesOption', 'Forex', 'Bond', 'MutualFund',
                         'CFD', 'Warrant']
        for sectype in sec_type_list:
            self.assertIn(sectype, metadata.tables.keys())
        engine.dispose()
项目:ibstract    作者:jesseliu0    | 项目源码 | 文件源码
def test_insert_hist_data(self):
        self._clear_db()
        init_db(self.db_info)

        # Insert two time-overlapped MarketDataBlocks
        async def run(loop, data):
            engine = await aiosa.create_engine(
                user=self.db_info['user'], db=self.db_info['db'],
                host=self.db_info['host'], password=self.db_info['password'],
                loop=loop, echo=False)
            await insert_hist_data(engine, 'Stock', data[0])
            await insert_hist_data(engine, 'Stock', data[1])
            engine.close()
            await engine.wait_closed()

        # Execute insertion
        blk0 = MarketDataBlock(testdata_insert_hist_data[0])
        blk1 = MarketDataBlock(testdata_insert_hist_data[1])
        data = [blk0, blk1]
        loop = asyncio.get_event_loop()
        loop.run_until_complete(run(loop, data))

        # Verify insertion
        df_source = testdata_insert_hist_data[2]
        engine = create_engine(self.db_conn)
        conn = engine.connect()
        metadata = MetaData(engine, reflect=True)
        table = metadata.tables['Stock']
        result = conn.execute(select([table]))
        # self.assertEqual(result.keys(), list(df_source.columns))
        df = pd.DataFrame(result.fetchall())
        df.columns = result.keys()
        _logger.debug(df.TickerTime[0])
        df.TickerTime = pd.DatetimeIndex(df.TickerTime).tz_localize('UTC')
        df_source.TickerTime = df_source.TickerTime.apply(pd.Timestamp)
        _logger.debug(df.iloc[0])
        assert_frame_equal(df, df_source)
项目:meteos    作者:openstack    | 项目源码 | 文件源码
def load_table(name, connection):
    return sa.Table(name, sa.MetaData(), autoload=True,
                    autoload_with=connection)
项目:stacker    作者:bamine    | 项目源码 | 文件源码
def initialize(self):
        metadata = MetaData()
        logs = Table(self.table_name, metadata,
                     Column('task', String, primary_key=True),
                     Column('date_time', DateTime, primary_key=True),
                     Column('model', String),
                     Column('parameters', String),
                     Column('score', Float),
                     Column('scorer_name', String),
                     Column('validation_method', String),
                     Column('predictions', String),
                     Column('random_state', Integer))
        mapper(self.OptimizationResultLog, logs)
        metadata.create_all(bind=self.engine)
项目:triage    作者:dssg    | 项目源码 | 文件源码
def table_object(table_name, db_engine):
    """Produce a table object for the given table name

    This does not load data about the table from the engine yet,
    so it is safe to call for a table that doesn't exist.

    Args:
        table_name (string) A table name (with schema)
        db_engine (sqlalchemy.engine)

    Returns: (sqlalchemy.Table)
    """
    schema, table = split_table(table_name)
    meta = MetaData(schema=schema, bind=db_engine)
    return Table(table, meta)