Python alembic.op 模块,bulk_insert() 实例源码

我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用alembic.op.bulk_insert()

项目:cookiecutter-graphene-flask    作者:karec    | 项目源码 | 文件源码
def upgrade():
    """Insert fixtures for models, tests purpose only"""
    users = [generate_users(i) for i in range(1, 52)]
    articles = [generate_articles(i, 50) for i in range(1, 200)]

    article = table(
        'article',
        sa.Column('id', sa.Integer(), nullable=False, primary_key=True, autoincrement=True),
        sa.Column('title', sa.String(length=100), nullable=False),
        sa.Column('content', sa.Text(), nullable=False),
        sa.Column('user_id', sa.Integer(), nullable=True),
    )

    user = table(
        'user',
        sa.Column('id', sa.Integer(), nullable=False, primary_key=True, autoincrement=True),
        sa.Column('username', sa.String(length=80), nullable=False),
        sa.Column('email', sa.String(length=120), nullable=False),
    )

    op.bulk_insert(user, users)
    op.bulk_insert(article, articles)
项目:vilfredo-core    作者:fairdemocracy    | 项目源码 | 文件源码
def upgrade():
    ### commands auto generated by Alembic - please adjust! ###
    voting_types = op.create_table('voting_types',
    sa.Column('id', sa.Integer(), nullable=False),
    sa.Column('name', sa.String(length=25), nullable=False),
    sa.PrimaryKeyConstraint('id')
    )
    ### Populate with question types
    op.bulk_insert(voting_types,
        [
            {'name':'triangle'},
            {'name':'linear'}
        ]
    )
    op.add_column(u'question', sa.Column('voting_type_id', sa.Integer(), nullable=False, server_default="1"))
    op.create_foreign_key('fk_quesion_voting_types', 'question', 'voting_types', ['voting_type_id'], ['id'])
    ### end Alembic commands ###
项目:vilfredo-core    作者:fairdemocracy    | 项目源码 | 文件源码
def upgrade():
    ### commands auto generated by Alembic - please adjust! ###
    question_types = op.create_table('question_types',
    sa.Column('id', sa.Integer(), nullable=False),
    sa.Column('name', sa.String(length=25), nullable=False),
    sa.PrimaryKeyConstraint('id')
    )
    ### Populate with question types
    op.bulk_insert(question_types,
        [
            {'name':'standard'},
            {'name':'image'}
        ]
    )
    op.add_column(u'question', sa.Column('question_type_id', sa.Integer(), nullable=False, server_default="1"))
    op.create_foreign_key('fk_quesion_question_types', 'question', 'question_types', ['question_type_id'], ['id'])
    ### end Alembic commands ###
项目:akamatsu    作者:rmed    | 项目源码 | 文件源码
def upgrade():
    roles_table = sa.sql.table(
        'roles',
        sa.sql.column('name', sa.String)
    )

    # Insert new roles
    op.bulk_insert(
        roles_table,
        [
            {'name': 'superblogger'},
            {'name': 'superuploader'},
        ]
    )
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def bulk_insert(self, table, rows):
        """Issue a "bulk insert" operation using the current
        migration context.

        This provides a means of representing an INSERT of multiple rows
        which works equally well in the context of executing on a live
        connection as well as that of generating a SQL script.   In the
        case of a SQL script, the values are rendered inline into the
        statement.

        e.g.::

            from alembic import op
            from datetime import date
            from sqlalchemy.sql import table, column
            from sqlalchemy import String, Integer, Date

            # Create an ad-hoc table to use for the insert statement.
            accounts_table = table('account',
                column('id', Integer),
                column('name', String),
                column('create_date', Date)
            )

            op.bulk_insert(accounts_table,
                [
                    {'id':1, 'name':'John Smith',
                            'create_date':date(2010, 10, 5)},
                    {'id':2, 'name':'Ed Williams',
                            'create_date':date(2007, 5, 27)},
                    {'id':3, 'name':'Wendy Jones',
                            'create_date':date(2008, 8, 15)},
                ]
            )
          """
        self.impl.bulk_insert(table, rows)
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def bulk_insert(self, table, rows):
        """Issue a "bulk insert" operation using the current
        migration context.

        This provides a means of representing an INSERT of multiple rows
        which works equally well in the context of executing on a live
        connection as well as that of generating a SQL script.   In the
        case of a SQL script, the values are rendered inline into the
        statement.

        e.g.::

            from alembic import op
            from datetime import date
            from sqlalchemy.sql import table, column
            from sqlalchemy import String, Integer, Date

            # Create an ad-hoc table to use for the insert statement.
            accounts_table = table('account',
                column('id', Integer),
                column('name', String),
                column('create_date', Date)
            )

            op.bulk_insert(accounts_table,
                [
                    {'id':1, 'name':'John Smith',
                            'create_date':date(2010, 10, 5)},
                    {'id':2, 'name':'Ed Williams',
                            'create_date':date(2007, 5, 27)},
                    {'id':3, 'name':'Wendy Jones',
                            'create_date':date(2008, 8, 15)},
                ]
            )
          """
        self.impl.bulk_insert(table, rows)
项目:Adventure-Insecure    作者:colinnewell    | 项目源码 | 文件源码
def upgrade():
    ### commands auto generated by Alembic - please adjust! ###
    t = op.create_table('office_numbers',
    sa.Column('id', sa.Integer(), nullable=False),
    sa.Column('date_created', sa.DateTime(), nullable=True),
    sa.Column('date_modified', sa.DateTime(), nullable=True),
    sa.Column('name', sa.String(length=128), nullable=False),
    sa.Column('number_prefix', sa.String(length=128), nullable=False),
    sa.Column('main_number', sa.String(length=128), nullable=False),
    sa.PrimaryKeyConstraint('id'),
    sa.UniqueConstraint('name')
    )
    op.bulk_insert(t, [
        {
            'name': 'Main Office',
            'number_prefix': '555-1',
            'main_number': '555-13211-3221',
        },
        {
            'name': 'Babel Office',
            'number_prefix': '555-2',
            'main_number': '555-21111-3221',
        },
        {
            'name': 'Office Minnows',
            'number_prefix': '555-9',
            'main_number': '555-99911-3221',
        },
    ])
    ### end Alembic commands ###
项目:liebraryrest    作者:gekorob    | 项目源码 | 文件源码
def _seed(table_obj, file_path, tx=None):
    with open(file_path, 'r') as f:
        reader = DictReader(f, delimiter=',')
        rows = list(reader)

    if tx is not None:
        rows = [_tx_row(row, tx) for row in rows]

    op.bulk_insert(table_obj, rows)
项目:papersummarize    作者:mrdrozdov    | 项目源码 | 文件源码
def copy_table(session, model_cls):
    rows = session.query(model_cls).all()
    rows = list(map(lambda x: vars(x), rows))
    op.bulk_insert(model_cls.__table__, rows)
项目:flask_boilerplate    作者:minodes    | 项目源码 | 文件源码
def upgrade():
    if current_env in ['test', 'dev']:
        users_table = table(
            'users',
            sa.Column('id', postgresql.UUID, server_default=sa.text('uuid_generate_v1()'), primary_key=True),
            sa.Column('name', sa.Text, nullable=False, unique=True),
            sa.Column('password', sa.Text, nullable=False),
            sa.Column('modified', sa.DateTime, server_default=sa.text('clock_timestamp()')),
            sa.Column('created', sa.DateTime, server_default=sa.text('now()'))
        )

        op.bulk_insert(users_table, [{'name': "test_user", 'password': hash_password('test123')}])
项目:research-eGrader    作者:openstax    | 项目源码 | 文件源码
def upgrade():
    op.bulk_insert(subject_table,
                   [
                       {'id': 3, 'name': 'Sociology', 'tag': 'book:stax-soc'},
                   ]
                   )
项目:research-eGrader    作者:openstax    | 项目源码 | 文件源码
def upgrade():
    ### commands auto generated by Alembic - please adjust! ###
    op.create_table('subjects',
    sa.Column('id', sa.Integer(), nullable=False),
    sa.Column('name', sa.String(), nullable=True),
    sa.Column('tag', sa.String(), nullable=True),
    sa.PrimaryKeyConstraint('id')
    )
    op.create_table('user_unqualified_exercises',
    sa.Column('id', sa.Integer(), nullable=False),
    sa.Column('user_id', sa.Integer(), nullable=True),
    sa.Column('exercise_id', sa.Integer(), nullable=True),
    sa.ForeignKeyConstraint(['exercise_id'], ['exercises.id'], ),
    sa.ForeignKeyConstraint(['user_id'], ['users.id'], ),
    sa.PrimaryKeyConstraint('id')
    )
    op.add_column(u'exercises', sa.Column('subject_id', sa.Integer(), nullable=True))
    op.create_foreign_key('exercises_subject_id_fkey', 'exercises', 'subjects', ['subject_id'], ['id'])
    op.add_column(u'responses', sa.Column('subject', sa.String(), nullable=True))
    op.add_column(u'responses', sa.Column('subject_id', sa.Integer(), nullable=True))
    op.create_foreign_key('responses_subject_id_fkey', 'responses', 'subjects', ['subject_id'], ['id'])
    ### end Alembic commands ###
    op.bulk_insert(subject_table,
                   [
                       {'id': 1, 'name': 'Biology', 'tag': 'apbio'},
                       {'id': 2, 'name': 'Physics', 'tag': 'k12phys'}
                   ]
                   )
项目:doorman    作者:mwielgoszewski    | 项目源码 | 文件源码
def upgrade():
    ### commands auto generated by Alembic - please adjust! ###
    DistributedQueryTask = namedtuple('DistributedQueryTask', [
        'id', 'status', 'retrieved', 'guid', 'node_id'])

    distributed_query_task = op.create_table('distributed_query_task',
        sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
        sa.Column('guid', sa.String(), nullable=False),
        sa.Column('status', sa.Integer(), nullable=False),
        sa.Column('timestamp', sa.DateTime(), nullable=True),
        sa.Column('distributed_query_id', sa.Integer(), nullable=False),
        sa.Column('node_id', sa.Integer(), nullable=False),
        sa.ForeignKeyConstraint(['distributed_query_id'], ['distributed_query.id'], ),
        sa.ForeignKeyConstraint(['node_id'], ['node.id'], ),
        sa.PrimaryKeyConstraint('id'),
        sa.UniqueConstraint('guid')
    )

    cursor = op.get_bind().execute("""
        SELECT id, status, retrieved, guid, node_id
        FROM distributed_query
        ORDER BY id;"""
    )
    results = map(DistributedQueryTask._make, cursor.fetchall())
    distributed_query_tasks = [dict(
        distributed_query_id=r.id,
        status=r.status,
        timestamp=r.retrieved,
        guid=r.guid,
        node_id=r.node_id) for r in results]

    op.bulk_insert(distributed_query_task, distributed_query_tasks)

    op.add_column(u'distributed_query', sa.Column('description', sa.String(), nullable=True))
    op.drop_constraint(u'distributed_query_guid_key', 'distributed_query', type_='unique')
    op.drop_constraint(u'distributed_query_node_id_fkey', 'distributed_query', type_='foreignkey')
    op.drop_column(u'distributed_query', 'status')
    op.drop_column(u'distributed_query', 'retrieved')
    op.drop_column(u'distributed_query', 'guid')
    op.drop_column(u'distributed_query', 'node_id')
    op.add_column(u'distributed_query_result', sa.Column('distributed_query_task_id', sa.Integer(), nullable=True))

    # distributed queries and tasks were the same before,
    # so their id's will remain the same as well.
    op.execute("""
        UPDATE distributed_query_result
        SET distributed_query_task_id = distributed_query_id;"""
    )

    op.alter_column(u'distributed_query_result', 'distributed_query_task_id', nullable=False)
    op.create_foreign_key(None, 'distributed_query_result', 'distributed_query_task', ['distributed_query_task_id'], ['id'])
    ### end Alembic commands ###
项目:doorman    作者:mwielgoszewski    | 项目源码 | 文件源码
def downgrade():
    ### commands auto generated by Alembic - please adjust! ###
    DistributedQuery = namedtuple('DistributedQuery', [
        'task_id', 'query_id',
        'guid', 'status', 'sql', 'timestamp', 'not_before',
        'retrieved', 'node_id'])

    cursor = op.get_bind().execute("""
        SELECT DISTINCT t.id AS task_id, q.id AS query_id,
            t.guid, t.status, q.sql, q.timestamp, q.not_before,
            t.timestamp AS retrieved, t.node_id
        FROM distributed_query q
        INNER JOIN distributed_query_task t
        ON q.id = t.distributed_query_id
        ORDER BY t.id;
    """)

    results = map(DistributedQuery._make, cursor.fetchall())

    op.drop_constraint(u'distributed_query_result_distributed_query_task_id_fkey', 'distributed_query_result', type_='foreignkey')
    op.drop_column(u'distributed_query_result', 'distributed_query_task_id')

    op.drop_constraint(u'distributed_query_task_distributed_query_id_fkey', 'distributed_query_task', type_='foreignkey')
    op.drop_table(u'distributed_query')

    distributed_query = op.create_table('distributed_query',
        sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
        sa.Column('guid', sa.String(), nullable=False),
        sa.Column('status', sa.Integer(), nullable=False),
        sa.Column('sql', sa.String(), nullable=False),
        sa.Column('timestamp', sa.DateTime(), nullable=True),
        sa.Column('not_before', sa.DateTime(), nullable=True),
        sa.Column('retrieved', sa.DateTime(), nullable=True),
        sa.Column('node_id', sa.Integer(), nullable=False),
        sa.ForeignKeyConstraint(['node_id'], ['node.id'], ),
        sa.PrimaryKeyConstraint('id'),
        sa.UniqueConstraint('guid')
    )

    distributed_queries = [dict(
        guid=r.guid,
        status=r.status,
        sql=r.sql,
        timestamp=r.timestamp,
        not_before=r.not_before,
        retrieved=r.retrieved,
        node_id=r.node_id) for r in results]

    op.bulk_insert(distributed_query, distributed_queries)
    op.drop_table('distributed_query_task')
    ### end Alembic commands ###
项目:fabric8-analytics-worker    作者:fabric8-analytics    | 项目源码 | 文件源码
def upgrade():
    """Upgrade the database to a newer revision."""
    # ### commands auto generated by Alembic - please adjust! ###
    ecosystems = op.create_table('ecosystems',
                                 sa.Column('id', sa.Integer(), nullable=False),
                                 sa.Column('name', sa.String(length=255), nullable=True),
                                 sa.Column('_backend', sa.Enum('none', 'npm', 'maven', 'pypi',
                                                               'rubygems', 'scm', 'crates',
                                                               name='ecosystem_backend_enum'),
                                           nullable=True),
                                 sa.Column('url', sa.String(length=255), nullable=True),
                                 sa.Column('fetch_url', sa.String(length=255), nullable=True),
                                 sa.PrimaryKeyConstraint('id'))
    op.bulk_insert(ecosystems,
                   [
                       {'id': 1, 'name': 'rubygems', '_backend': 'rubygems',
                        'url': 'https://rubygems.org/',
                        'fetch_url': 'https://rubygems.org/api/v1'},
                       {'id': 2, 'name': 'npm', '_backend': 'npm',
                        'url': 'https://www.npmjs.com/',
                        'fetch_url': 'https://registry.npmjs.org/'},
                       {'id': 3, 'name': 'maven', '_backend': 'maven',
                        'url': 'https://repo1.maven.org/maven2/', 'fetch_url': None},
                       {'id': 4, 'name': 'pypi', '_backend': 'pypi',
                        'url': 'https://pypi.python.org/',
                        'fetch_url': 'https://pypi.python.org/pypi'},
                       {'id': 5, 'name': 'go', '_backend': 'scm', 'url': None, 'fetch_url': None},
                       {'id': 6, 'name': 'crates', '_backend': 'crates',
                        'url': 'https://crates.io/',
                        'fetch_url': None}, ])
    op.create_table('packages',
                    sa.Column('id', sa.Integer(), nullable=False),
                    sa.Column('ecosystem_id', sa.Integer(), nullable=True),
                    sa.Column('name', sa.String(length=255), nullable=True),
                    sa.ForeignKeyConstraint(['ecosystem_id'], ['ecosystems.id'], ),
                    sa.PrimaryKeyConstraint('id'),
                    sa.UniqueConstraint('ecosystem_id', 'name', name='ep_unique'))
    op.create_table('versions',
                    sa.Column('id', sa.Integer(), nullable=False),
                    sa.Column('package_id', sa.Integer(), nullable=True),
                    sa.Column('identifier', sa.String(length=255), nullable=True),
                    sa.ForeignKeyConstraint(['package_id'], ['packages.id'], ),
                    sa.PrimaryKeyConstraint('id'),
                    sa.UniqueConstraint('package_id', 'identifier', name='pv_unique'))
    op.add_column('analyses', sa.Column('version_id', sa.Integer(), nullable=True))
    op.create_foreign_key(None, 'analyses', 'versions', ['version_id'], ['id'])
    op.drop_column('analyses', 'package')
    op.drop_column('analyses', 'ecosystem')
    op.drop_column('analyses', 'version')
    op.add_column('analysis_requests', sa.Column('version_id', sa.Integer(), nullable=True))
    op.drop_index('epv_index', table_name='analysis_requests')
    op.create_index('epv_index', 'analysis_requests', ['version_id'], unique=True,
                    postgresql_where=sa.text('fulfilled_at IS NULL'))
    op.create_foreign_key(None, 'analysis_requests', 'versions', ['version_id'], ['id'])
    op.drop_column('analysis_requests', 'package')
    op.drop_column('analysis_requests', 'ecosystem')
    op.drop_column('analysis_requests', 'version')
    # ### end Alembic commands ###
项目:Albireo    作者:lordfriend    | 项目源码 | 文件源码
def upgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    video_file_table = op.create_table('video_file',
                                       sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
                                       sa.Column('bangumi_id', postgresql.UUID(as_uuid=True), nullable=False),
                                       sa.Column('episode_id', postgresql.UUID(as_uuid=True), nullable=False),
                                       sa.Column('file_name', sa.String(), nullable=True),
                                       sa.Column('file_path', sa.String(), nullable=True),
                                       sa.Column('torrent_id', sa.String(), nullable=True),
                                       sa.Column('download_url', sa.String(), nullable=True),
                                       sa.Column('status', sa.Integer(), nullable=False),
                                       sa.Column('resolution_w', sa.Integer(), nullable=True),
                                       sa.Column('resolution_h', sa.Integer(), nullable=True),
                                       sa.Column('duration', sa.Integer(), nullable=True),
                                       sa.Column('label', sa.String(), nullable=True),
                                       sa.ForeignKeyConstraint(['bangumi_id'], ['bangumi.id'], ),
                                       sa.ForeignKeyConstraint(['episode_id'], ['episodes.id'], ),
                                       sa.PrimaryKeyConstraint('id')
                                       )
    # ### end Alembic commands ###

    connection = op.get_bind()
    result = connection.execute(sa.text(
        'SELECT t.episode_id, t.torrent_id, t.file_path, eps.bangumi_id, eps.episode_no FROM torrentfile t LEFT JOIN episodes eps ON eps.id = t.episode_id WHERE file_path NOTNULL'))
    video_file_list = []
    for row in result:
        video_file = {
            'id': uuid4(),
            'status': 3
        }
        if row[1] == -1 or __is_uuid4(row[1]):
            video_file['torrent_id'] = None
        else:
            video_file['torrent_id'] = row[1]
        video_file['episode_id'] = row[0]
        video_file['file_path'] = row[2]
        video_file['bangumi_id'] = row[3]

        meta_info = video_manager.get_video_meta(
            u'{0}/{1}/{2}'.format(get_base_path(), str(video_file['bangumi_id']), video_file['file_path']))

        if meta_info is None:
            continue

        video_file['resolution_w'] = meta_info.get('width')
        video_file['resolution_h'] = meta_info.get('height')
        video_file['duration'] = meta_info.get('duration')

        video_file_list.append(video_file)

    op.bulk_insert(video_file_table, video_file_list)

    connection.execute(sa.text('UPDATE episodes SET status = 0 WHERE status = 1'))