Python sqlalchemy.sql 模块,text() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.sql.text()

项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def get_temp_table_names(self, connection, **kw):
        schema = self.denormalize_name(self.default_schema_name)

        sql_str = "SELECT table_name FROM all_tables WHERE "
        if self.exclude_tablespaces:
            sql_str += (
                "nvl(tablespace_name, 'no tablespace') "
                "NOT IN (%s) AND " % (
                    ', '.join(["'%s'" % ts for ts in self.exclude_tablespaces])
                )
            )
        sql_str += (
            "OWNER = :owner "
            "AND IOT_NAME IS NULL "
            "AND DURATION IS NOT NULL")

        cursor = connection.execute(sql.text(sql_str), owner=schema)
        return [self.normalize_name(row[0]) for row in cursor]
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def define_constraint_cascades(self, constraint):
        text = ""
        if constraint.ondelete is not None:
            text += " ON DELETE %s" % constraint.ondelete

        # oracle has no ON UPDATE CASCADE -
        # its only available via triggers
        # http://asktom.oracle.com/tkyte/update_cascade/index.html
        if constraint.onupdate is not None:
            util.warn(
                "Oracle does not contain native UPDATE CASCADE "
                "functionality - onupdates will not be rendered for foreign "
                "keys.  Consider using deferrable=True, initially='deferred' "
                "or triggers.")

        return text
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def get_view_definition(self, connection, view_name, schema=None,
                            resolve_synonyms=False, dblink='', **kw):
        info_cache = kw.get('info_cache')
        (view_name, schema, dblink, synonym) = \
            self._prepare_reflection_args(connection, view_name, schema,
                                          resolve_synonyms, dblink,
                                          info_cache=info_cache)

        params = {'view_name': view_name}
        text = "SELECT text FROM all_views WHERE view_name=:view_name"

        if schema is not None:
            text += " AND owner = :schema"
            params['schema'] = schema

        rp = connection.execute(sql.text(text), **params).scalar()
        if rp:
            if util.py2k:
                rp = rp.decode(self.encoding)
            return rp
        else:
            return None
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def get_table_names(self, connection, schema=None, **kw):
        if schema is None:
            schema = self.default_schema_name

        TABLE_SQL = text("""
          SELECT o.name AS name
          FROM sysobjects o JOIN sysusers u ON o.uid = u.uid
          WHERE u.name = :schema_name
            AND o.type = 'U'
        """)

        if util.py2k:
            if isinstance(schema, unicode):
                schema = schema.encode("ascii")

        tables = connection.execute(TABLE_SQL, schema_name=schema)

        return [t["name"] for t in tables]
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def get_view_definition(self, connection, view_name, schema=None, **kw):
        if schema is None:
            schema = self.default_schema_name

        VIEW_DEF_SQL = text("""
          SELECT c.text
          FROM syscomments c JOIN sysobjects o ON c.id = o.id
          WHERE o.name = :view_name
            AND o.type = 'V'
        """)

        if util.py2k:
            if isinstance(view_name, unicode):
                view_name = view_name.encode("ascii")

        view = connection.execute(VIEW_DEF_SQL, view_name=view_name)

        return view.scalar()
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def get_view_names(self, connection, schema=None, **kw):
        if schema is None:
            schema = self.default_schema_name

        VIEW_SQL = text("""
          SELECT o.name AS name
          FROM sysobjects o JOIN sysusers u ON o.uid = u.uid
          WHERE u.name = :schema_name
            AND o.type = 'V'
        """)

        if util.py2k:
            if isinstance(schema, unicode):
                schema = schema.encode("ascii")
        views = connection.execute(VIEW_SQL, schema_name=schema)

        return [v["name"] for v in views]
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _literal_as_text(element, warn=False):
    if isinstance(element, Visitable):
        return element
    elif hasattr(element, '__clause_element__'):
        return element.__clause_element__()
    elif isinstance(element, util.string_types):
        if warn:
            util.warn_limited(
                "Textual SQL expression %(expr)r should be "
                "explicitly declared as text(%(expr)r)",
                {"expr": util.ellipses_string(element)})

        return TextClause(util.text_type(element))
    elif isinstance(element, (util.NoneType, bool)):
        return _const_expr(element)
    else:
        raise exc.ArgumentError(
            "SQL expression object or string expected, got object of type %r "
            "instead" % type(element)
        )
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def fix_task_date():
    """Fix Date format in Task."""
    import re
    from datetime import datetime
    with app.app_context():
        query = text('''SELECT id, created FROM task WHERE created LIKE ('%Date%')''')
        results = db.engine.execute(query)
        tasks = results.fetchall()
        for task in tasks:
            # It's in miliseconds
            timestamp = int(re.findall(r'\d+', task.created)[0])
            print timestamp
            # Postgresql expects this format 2015-05-21T13:19:06.471074
            fixed_created = datetime.fromtimestamp(timestamp/1000)\
                                    .replace(microsecond=timestamp%1000*1000)\
                                    .strftime('%Y-%m-%dT%H:%M:%S.%f')
            query = text('''UPDATE task SET created=:created WHERE id=:id''')
            db.engine.execute(query, created=fixed_created, id=task.id)
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def add_custom_contrib_button_to(project, user_id_or_ip):
    """Add a customized contrib button for a project."""
    if type(project) != dict:
        project = project.dictize()
    project['contrib_button'] = check_contributing_state(project,
                                                         **user_id_or_ip)
    query = text('''
                 SELECT COUNT(id) as ct from blogpost
                 WHERE project_id=:project_id;
                 ''')
    results = session.execute(query, dict(project_id=project['id']))
    for row in results:
        project['n_blogposts'] = row.ct

    project['n_results'] = n_results(project['id'])

    return project
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def rank_and_score(user_id):
    """Return rank and score for a user."""
    # See: https://gist.github.com/tokumine/1583695
    sql = text('''
               WITH global_rank AS (
                    WITH scores AS (
                        SELECT user_id, COUNT(*) AS score FROM task_run
                        WHERE user_id IS NOT NULL GROUP BY user_id)
                    SELECT user_id, score, rank() OVER (ORDER BY score desc)
                    FROM scores)
               SELECT * from global_rank WHERE user_id=:user_id;
               ''')
    results = session.execute(sql, dict(user_id=user_id))
    rank_and_score = dict(rank=None, score=None)
    for row in results:
        rank_and_score['rank'] = row.rank
        rank_and_score['score'] = row.score
    return rank_and_score
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def projects_contributed(user_id):
    """Return projects that user_id has contributed to."""
    sql = text('''
               WITH projects_contributed as
                    (SELECT DISTINCT(project_id) FROM task_run
                     WHERE user_id=:user_id)
               SELECT project.id, project.name, project.short_name, project.owner_id,
               project.description, project.info FROM project, projects_contributed
               WHERE project.id=projects_contributed.project_id ORDER BY project.name DESC;
               ''')
    results = session.execute(sql, dict(user_id=user_id))
    projects_contributed = []
    for row in results:
        project = dict(id=row.id, name=row.name, short_name=row.short_name,
                       owner_id=row.owner_id,
                       description=row.description,
                       overall_progress=overall_progress(row.id),
                       n_tasks=n_tasks(row.id),
                       n_volunteers=n_volunteers(row.id),
                       info=row.info)
        projects_contributed.append(project)
    return projects_contributed
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def published_projects(user_id):
    """Return published projects for user_id."""
    sql = text('''
               SELECT project.id, project.name, project.short_name, project.description,
               project.owner_id,
               project.info
               FROM project
               WHERE project.published=true
               AND project.owner_id=:user_id;
               ''')
    projects_published = []
    results = session.execute(sql, dict(user_id=user_id))
    for row in results:
        project = dict(id=row.id, name=row.name, short_name=row.short_name,
                       owner_id=row.owner_id,
                       description=row.description,
                       overall_progress=overall_progress(row.id),
                       n_tasks=n_tasks(row.id),
                       n_volunteers=n_volunteers(row.id),
                       info=row.info)
        projects_published.append(project)
    return projects_published
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def draft_projects(user_id):
    """Return draft projects for user_id."""
    sql = text('''
               SELECT project.id, project.name, project.short_name, project.description,
               project.owner_id,
               project.info
               FROM project
               WHERE project.owner_id=:user_id
               AND project.published=false;
               ''')
    projects_draft = []
    results = session.execute(sql, dict(user_id=user_id))
    for row in results:
        project = dict(id=row.id, name=row.name, short_name=row.short_name,
                       owner_id=row.owner_id,
                       description=row.description,
                       overall_progress=overall_progress(row.id),
                       n_tasks=n_tasks(row.id),
                       n_volunteers=n_volunteers(row.id),
                       info=row.info)
        projects_draft.append(project)
    return projects_draft
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def get_top5_projects_24_hours():
    """Return the top 5 projects more active in the last 24 hours."""
    # Top 5 Most active projects in last 24 hours
    sql = text('''SELECT project.id, project.name, project.short_name, project.info,
               COUNT(task_run.project_id) AS n_answers FROM project, task_run
               WHERE project.id=task_run.project_id
               AND DATE(task_run.finish_time) > NOW() - INTERVAL '24 hour'
               AND DATE(task_run.finish_time) <= NOW()
               GROUP BY project.id
               ORDER BY n_answers DESC LIMIT 5;''')

    results = session.execute(sql, dict(limit=5))
    top5_apps_24_hours = []
    for row in results:
        tmp = dict(id=row.id, name=row.name, short_name=row.short_name,
                   info=row.info, n_answers=row.n_answers)
        top5_apps_24_hours.append(tmp)
    return top5_apps_24_hours
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def get_top5_users_24_hours():
    """Return top 5 users in last 24 hours."""
    # Top 5 Most active users in last 24 hours
    sql = text('''SELECT "user".id, "user".fullname, "user".name,
               COUNT(task_run.project_id) AS n_answers FROM "user", task_run
               WHERE "user".id=task_run.user_id
               AND DATE(task_run.finish_time) > NOW() - INTERVAL '24 hour'
               AND DATE(task_run.finish_time) <= NOW()
               GROUP BY "user".id
               ORDER BY n_answers DESC LIMIT 5;''')

    results = session.execute(sql, dict(limit=5))
    top5_users_24_hours = []
    for row in results:
        user = dict(id=row.id, fullname=row.fullname,
                    name=row.name,
                    n_answers=row.n_answers)
        top5_users_24_hours.append(user)
    return top5_users_24_hours
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def get_top(n=4):
    """Return top n=4 projects."""
    sql = text('''SELECT project.id, project.name, project.short_name, project.description,
               project.info,
               COUNT(project_id) AS total
               FROM task_run, project
               WHERE project_id IS NOT NULL
               AND project.id=project_id
               AND (project.info->>'passwd_hash') IS NULL
               GROUP BY project.id ORDER BY total DESC LIMIT :limit;''')
    results = session.execute(sql, dict(limit=n))
    top_projects = []
    for row in results:
        project = dict(id=row.id, name=row.name, short_name=row.short_name,
                       description=row.description,
                       info=row.info,
                       n_volunteers=n_volunteers(row.id),
                       n_completed_tasks=n_completed_tasks(row.id))

        top_projects.append(Project().to_public_json(project))
    return top_projects


#@memoize(timeout=timeouts.get('BROWSE_TASKS_TIMEOUT'))
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def browse_tasks(project_id, limit=10, offset=0):
    """Cache browse tasks view for a project."""
    sql = text('''
               SELECT task.id, task.n_answers, sum(counter.n_task_runs) as n_task_runs
               FROM task, counter
               WHERE task.id=counter.task_id and task.project_id=:project_id
               GROUP BY task.id
               ORDER BY task.id ASC LIMIT :limit OFFSET :offset
               ''')
    results = session.execute(sql, dict(project_id=project_id,
                                        limit=limit,
                                        offset=offset))
    tasks = []
    for row in results:
        task = dict(id=row.id, n_task_runs=row.n_task_runs,
                    n_answers=row.n_answers)
        task['pct_status'] = _pct_status(row.n_task_runs, row.n_answers)
        tasks.append(task)
    return tasks
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def n_count(category):
    """Count the number of projects in a given category."""
    if category == 'featured':
        return _n_featured()
    if category == 'draft':
        return _n_draft()
    sql = text('''
               WITH uniq AS (
               SELECT COUNT(project.id) FROM project
               LEFT OUTER JOIN category ON project.category_id=category.id
               WHERE
               category.short_name=:category
               AND project.published=true
               AND (project.info->>'passwd_hash') IS NULL
               GROUP BY project.id)
               SELECT COUNT(*) FROM uniq
               ''')

    results = session.execute(sql, dict(category=category))
    count = 0
    for row in results:
        count = row[0]
    return count
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_create_dict_job(self):
        """Test JOB create dict job works."""
        user = UserFactory.create(pro=True)
        project = ProjectFactory.create(owner=user)
        from sqlalchemy.sql import text
        from pybossa.core import db
        sql = text('''SELECT project.id, project.short_name FROM project, "user"
                   WHERE project.owner_id="user".id AND "user".pro=True;''')
        results = db.slave_session.execute(sql)
        jobs_generator = create_dict_jobs(results, get_project_stats, (10 * 60))
        jobs = []
        for job in jobs_generator:
            jobs.append(job)

        err_msg = "There should be only one job"
        assert len(jobs) == 1, err_msg

        job = jobs[0]
        assert 'get_project_stats' in job['name'].__name__
        assert job['args'] == [project.id, project.short_name]
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def define_constraint_cascades(self, constraint):
        text = ""
        if constraint.ondelete is not None:
            text += " ON DELETE %s" % constraint.ondelete

        # oracle has no ON UPDATE CASCADE -
        # its only available via triggers
        # http://asktom.oracle.com/tkyte/update_cascade/index.html
        if constraint.onupdate is not None:
            util.warn(
                "Oracle does not contain native UPDATE CASCADE "
                "functionality - onupdates will not be rendered for foreign "
                "keys.  Consider using deferrable=True, initially='deferred' "
                "or triggers.")

        return text
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def get_view_definition(self, connection, view_name, schema=None,
                            resolve_synonyms=False, dblink='', **kw):
        info_cache = kw.get('info_cache')
        (view_name, schema, dblink, synonym) = \
            self._prepare_reflection_args(connection, view_name, schema,
                                          resolve_synonyms, dblink,
                                          info_cache=info_cache)

        params = {'view_name': view_name}
        text = "SELECT text FROM all_views WHERE view_name=:view_name"

        if schema is not None:
            text += " AND owner = :schema"
            params['schema'] = schema

        rp = connection.execute(sql.text(text), **params).scalar()
        if rp:
            if util.py2k:
                rp = rp.decode(self.encoding)
            return rp
        else:
            return None
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def get_table_names(self, connection, schema=None, **kw):
        if schema is None:
            schema = self.default_schema_name

        TABLE_SQL = text("""
          SELECT o.name AS name
          FROM sysobjects o JOIN sysusers u ON o.uid = u.uid
          WHERE u.name = :schema_name
            AND o.type = 'U'
        """)

        if util.py2k:
            if isinstance(schema, unicode):
                schema = schema.encode("ascii")

        tables = connection.execute(TABLE_SQL, schema_name=schema)

        return [t["name"] for t in tables]
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def get_view_definition(self, connection, view_name, schema=None, **kw):
        if schema is None:
            schema = self.default_schema_name

        VIEW_DEF_SQL = text("""
          SELECT c.text
          FROM syscomments c JOIN sysobjects o ON c.id = o.id
          WHERE o.name = :view_name
            AND o.type = 'V'
        """)

        if util.py2k:
            if isinstance(view_name, unicode):
                view_name = view_name.encode("ascii")

        view = connection.execute(VIEW_DEF_SQL, view_name=view_name)

        return view.scalar()
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def get_view_names(self, connection, schema=None, **kw):
        if schema is None:
            schema = self.default_schema_name

        VIEW_SQL = text("""
          SELECT o.name AS name
          FROM sysobjects o JOIN sysusers u ON o.uid = u.uid
          WHERE u.name = :schema_name
            AND o.type = 'V'
        """)

        if util.py2k:
            if isinstance(schema, unicode):
                schema = schema.encode("ascii")
        views = connection.execute(VIEW_SQL, schema_name=schema)

        return [v["name"] for v in views]
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def _literal_as_text(element, warn=False):
    if isinstance(element, Visitable):
        return element
    elif hasattr(element, '__clause_element__'):
        return element.__clause_element__()
    elif isinstance(element, util.string_types):
        if warn:
            util.warn_limited(
                "Textual SQL expression %(expr)r should be "
                "explicitly declared as text(%(expr)r)",
                {"expr": util.ellipses_string(element)})

        return TextClause(util.text_type(element))
    elif isinstance(element, (util.NoneType, bool)):
        return _const_expr(element)
    else:
        raise exc.ArgumentError(
            "SQL expression object or string expected, got object of type %r "
            "instead" % type(element)
        )
项目:drydock    作者:att-comdev    | 项目源码 | 文件源码
def tabularasa(self):
        """Truncate all tables.

        Used for testing to truncate all tables so the database is clean.
        """
        table_names = [
            'tasks',
            'result_message',
            'active_instance',
            'boot_action',
            'boot_action_status',
        ]

        conn = self.db_engine.connect()
        for t in table_names:
            query_text = sql.text(
                "TRUNCATE TABLE %s" % t).execution_options(autocommit=True)
            conn.execute(query_text)
        conn.close()
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def define_constraint_cascades(self, constraint):
        text = ""
        if constraint.ondelete is not None:
            text += " ON DELETE %s" % constraint.ondelete

        # oracle has no ON UPDATE CASCADE -
        # its only available via triggers
        # http://asktom.oracle.com/tkyte/update_cascade/index.html
        if constraint.onupdate is not None:
            util.warn(
                "Oracle does not contain native UPDATE CASCADE "
                "functionality - onupdates will not be rendered for foreign "
                "keys.  Consider using deferrable=True, initially='deferred' "
                "or triggers.")

        return text
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def get_view_definition(self, connection, view_name, schema=None,
                            resolve_synonyms=False, dblink='', **kw):
        info_cache = kw.get('info_cache')
        (view_name, schema, dblink, synonym) = \
            self._prepare_reflection_args(connection, view_name, schema,
                                          resolve_synonyms, dblink,
                                          info_cache=info_cache)

        params = {'view_name': view_name}
        text = "SELECT text FROM all_views WHERE view_name=:view_name"

        if schema is not None:
            text += " AND owner = :schema"
            params['schema'] = schema

        rp = connection.execute(sql.text(text), **params).scalar()
        if rp:
            if util.py2k:
                rp = rp.decode(self.encoding)
            return rp
        else:
            return None
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def get_view_definition(self, connection, view_name, schema=None, **kw):
        if schema is None:
            schema = self.default_schema_name

        VIEW_DEF_SQL = text("""
          SELECT c.text
          FROM syscomments c JOIN sysobjects o ON c.id = o.id
          WHERE o.name = :view_name
            AND o.type = 'V'
        """)

        if util.py2k:
            if isinstance(view_name, unicode):
                view_name = view_name.encode("ascii")

        view = connection.execute(VIEW_DEF_SQL, view_name=view_name)

        return view.scalar()
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def get_view_names(self, connection, schema=None, **kw):
        if schema is None:
            schema = self.default_schema_name

        VIEW_SQL = text("""
          SELECT o.name AS name
          FROM sysobjects o JOIN sysusers u ON o.uid = u.uid
          WHERE u.name = :schema_name
            AND o.type = 'V'
        """)

        if util.py2k:
            if isinstance(schema, unicode):
                schema = schema.encode("ascii")
        views = connection.execute(VIEW_SQL, schema_name=schema)

        return [v["name"] for v in views]
项目:kuberdock-platform    作者:cloudlinux    | 项目源码 | 文件源码
def update_node_ceph_flags():
    hosts_query = text("SELECT hostname from nodes")
    hosts = [item[0] for item in db.session.execute(hosts_query).fetchall()]
    hosts_with_ceph = []
    for host in hosts:
        if not tasks.is_ceph_installed_on_node(host):
            continue
        hosts_with_ceph.append(host)
    if not hosts_with_ceph:
        return
    flags_query =\
        "INSERT INTO node_flags (node_id, created, deleted, flag_name, flag_value) "\
        "SELECT id, NOW() at time zone 'utc', NULL, 'ceph_installed', 'true' FROM nodes "\
        "WHERE hostname IN ({})".format(
            ', '.join("'" + host + "'" for host in hosts_with_ceph)
        )
    db.session.execute(text(flags_query))
    db.session.commit()
    for host in hosts_with_ceph:
        tasks.add_k8s_node_labels(
            host, {NODE_CEPH_AWARE_KUBERDOCK_LABEL: "True"})
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def define_constraint_cascades(self, constraint):
        text = ""
        if constraint.ondelete is not None:
            text += " ON DELETE %s" % constraint.ondelete

        # oracle has no ON UPDATE CASCADE -
        # its only available via triggers
        # http://asktom.oracle.com/tkyte/update_cascade/index.html
        if constraint.onupdate is not None:
            util.warn(
                "Oracle does not contain native UPDATE CASCADE "
                "functionality - onupdates will not be rendered for foreign "
                "keys.  Consider using deferrable=True, initially='deferred' "
                "or triggers.")

        return text
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def get_view_definition(self, connection, view_name, schema=None,
                            resolve_synonyms=False, dblink='', **kw):
        info_cache = kw.get('info_cache')
        (view_name, schema, dblink, synonym) = \
            self._prepare_reflection_args(connection, view_name, schema,
                                          resolve_synonyms, dblink,
                                          info_cache=info_cache)

        params = {'view_name': view_name}
        text = "SELECT text FROM all_views WHERE view_name=:view_name"

        if schema is not None:
            text += " AND owner = :schema"
            params['schema'] = schema

        rp = connection.execute(sql.text(text), **params).scalar()
        if rp:
            if util.py2k:
                rp = rp.decode(self.encoding)
            return rp
        else:
            return None
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def get_view_definition(self, connection, view_name, schema=None, **kw):
        if schema is None:
            schema = self.default_schema_name

        VIEW_DEF_SQL = text("""
          SELECT c.text
          FROM syscomments c JOIN sysobjects o ON c.id = o.id
          WHERE o.name = :view_name
            AND o.type = 'V'
        """)

        if util.py2k:
            if isinstance(view_name, unicode):
                view_name = view_name.encode("ascii")

        view = connection.execute(VIEW_DEF_SQL, view_name=view_name)

        return view.scalar()
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def get_view_names(self, connection, schema=None, **kw):
        if schema is None:
            schema = self.default_schema_name

        VIEW_SQL = text("""
          SELECT o.name AS name
          FROM sysobjects o JOIN sysusers u ON o.uid = u.uid
          WHERE u.name = :schema_name
            AND o.type = 'V'
        """)

        if util.py2k:
            if isinstance(schema, unicode):
                schema = schema.encode("ascii")
        views = connection.execute(VIEW_SQL, schema_name=schema)

        return [v["name"] for v in views]
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def define_constraint_cascades(self, constraint):
        text = ""
        if constraint.ondelete is not None:
            text += " ON DELETE %s" % constraint.ondelete

        # oracle has no ON UPDATE CASCADE -
        # its only available via triggers
        # http://asktom.oracle.com/tkyte/update_cascade/index.html
        if constraint.onupdate is not None:
            util.warn(
                "Oracle does not contain native UPDATE CASCADE "
                "functionality - onupdates will not be rendered for foreign "
                "keys.  Consider using deferrable=True, initially='deferred' "
                "or triggers.")

        return text
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def get_view_definition(self, connection, view_name, schema=None,
                            resolve_synonyms=False, dblink='', **kw):
        info_cache = kw.get('info_cache')
        (view_name, schema, dblink, synonym) = \
            self._prepare_reflection_args(connection, view_name, schema,
                                          resolve_synonyms, dblink,
                                          info_cache=info_cache)

        params = {'view_name': view_name}
        text = "SELECT text FROM all_views WHERE view_name=:view_name"

        if schema is not None:
            text += " AND owner = :schema"
            params['schema'] = schema

        rp = connection.execute(sql.text(text), **params).scalar()
        if rp:
            if util.py2k:
                rp = rp.decode(self.encoding)
            return rp
        else:
            return None
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def get_table_names(self, connection, schema=None, **kw):
        if schema is None:
            schema = self.default_schema_name

        TABLE_SQL = text("""
          SELECT o.name AS name
          FROM sysobjects o JOIN sysusers u ON o.uid = u.uid
          WHERE u.name = :schema_name
            AND o.type = 'U'
        """)

        if util.py2k:
            if isinstance(schema, unicode):
                schema = schema.encode("ascii")

        tables = connection.execute(TABLE_SQL, schema_name=schema)

        return [t["name"] for t in tables]
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def get_view_definition(self, connection, view_name, schema=None, **kw):
        if schema is None:
            schema = self.default_schema_name

        VIEW_DEF_SQL = text("""
          SELECT c.text
          FROM syscomments c JOIN sysobjects o ON c.id = o.id
          WHERE o.name = :view_name
            AND o.type = 'V'
        """)

        if util.py2k:
            if isinstance(view_name, unicode):
                view_name = view_name.encode("ascii")

        view = connection.execute(VIEW_DEF_SQL, view_name=view_name)

        return view.scalar()
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def get_view_names(self, connection, schema=None, **kw):
        if schema is None:
            schema = self.default_schema_name

        VIEW_SQL = text("""
          SELECT o.name AS name
          FROM sysobjects o JOIN sysusers u ON o.uid = u.uid
          WHERE u.name = :schema_name
            AND o.type = 'V'
        """)

        if util.py2k:
            if isinstance(schema, unicode):
                schema = schema.encode("ascii")
        views = connection.execute(VIEW_SQL, schema_name=schema)

        return [v["name"] for v in views]
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def _literal_as_text(element, warn=False):
    if isinstance(element, Visitable):
        return element
    elif hasattr(element, '__clause_element__'):
        return element.__clause_element__()
    elif isinstance(element, util.string_types):
        if warn:
            util.warn_limited(
                "Textual SQL expression %(expr)r should be "
                "explicitly declared as text(%(expr)r)",
                {"expr": util.ellipses_string(element)})

        return TextClause(util.text_type(element))
    elif isinstance(element, (util.NoneType, bool)):
        return _const_expr(element)
    else:
        raise exc.ArgumentError(
            "SQL expression object or string expected, got object of type %r "
            "instead" % type(element)
        )
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def define_constraint_cascades(self, constraint):
        text = ""
        if constraint.ondelete is not None:
            text += " ON DELETE %s" % constraint.ondelete

        # oracle has no ON UPDATE CASCADE -
        # its only available via triggers
        # http://asktom.oracle.com/tkyte/update_cascade/index.html
        if constraint.onupdate is not None:
            util.warn(
                "Oracle does not contain native UPDATE CASCADE "
                "functionality - onupdates will not be rendered for foreign "
                "keys.  Consider using deferrable=True, initially='deferred' "
                "or triggers.")

        return text
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def get_table_names(self, connection, schema=None, **kw):
        schema = self.denormalize_name(schema or self.default_schema_name)

        # note that table_names() isn't loading DBLINKed or synonym'ed tables
        if schema is None:
            schema = self.default_schema_name

        sql_str = "SELECT table_name FROM all_tables WHERE "
        if self.exclude_tablespaces:
            sql_str += (
                "nvl(tablespace_name, 'no tablespace') "
                "NOT IN (%s) AND " % (
                    ', '.join(["'%s'" % ts for ts in self.exclude_tablespaces])
                )
            )
        sql_str += (
            "OWNER = :owner "
            "AND IOT_NAME IS NULL "
            "AND DURATION IS NULL")

        cursor = connection.execute(sql.text(sql_str), owner=schema)
        return [self.normalize_name(row[0]) for row in cursor]
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def get_view_definition(self, connection, view_name, schema=None,
                            resolve_synonyms=False, dblink='', **kw):
        info_cache = kw.get('info_cache')
        (view_name, schema, dblink, synonym) = \
            self._prepare_reflection_args(connection, view_name, schema,
                                          resolve_synonyms, dblink,
                                          info_cache=info_cache)

        params = {'view_name': view_name}
        text = "SELECT text FROM all_views WHERE view_name=:view_name"

        if schema is not None:
            text += " AND owner = :schema"
            params['schema'] = schema

        rp = connection.execute(sql.text(text), **params).scalar()
        if rp:
            if util.py2k:
                rp = rp.decode(self.encoding)
            return rp
        else:
            return None
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def get_table_names(self, connection, schema=None, **kw):
        if schema is None:
            schema = self.default_schema_name

        TABLE_SQL = text("""
          SELECT o.name AS name
          FROM sysobjects o JOIN sysusers u ON o.uid = u.uid
          WHERE u.name = :schema_name
            AND o.type = 'U'
        """)

        if util.py2k:
            if isinstance(schema, unicode):
                schema = schema.encode("ascii")

        tables = connection.execute(TABLE_SQL, schema_name=schema)

        return [t["name"] for t in tables]
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def get_view_definition(self, connection, view_name, schema=None, **kw):
        if schema is None:
            schema = self.default_schema_name

        VIEW_DEF_SQL = text("""
          SELECT c.text
          FROM syscomments c JOIN sysobjects o ON c.id = o.id
          WHERE o.name = :view_name
            AND o.type = 'V'
        """)

        if util.py2k:
            if isinstance(view_name, unicode):
                view_name = view_name.encode("ascii")

        view = connection.execute(VIEW_DEF_SQL, view_name=view_name)

        return view.scalar()
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def get_view_names(self, connection, schema=None, **kw):
        if schema is None:
            schema = self.default_schema_name

        VIEW_SQL = text("""
          SELECT o.name AS name
          FROM sysobjects o JOIN sysusers u ON o.uid = u.uid
          WHERE u.name = :schema_name
            AND o.type = 'V'
        """)

        if util.py2k:
            if isinstance(schema, unicode):
                schema = schema.encode("ascii")
        views = connection.execute(VIEW_SQL, schema_name=schema)

        return [v["name"] for v in views]
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def get_select_hint_text(self, byfroms):
        return " ".join(
            "/*+ %s */" % text for table, text in byfroms.items()
        )
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def visit_create_index(self, create):
        index = create.element
        self._verify_index_table(index)
        preparer = self.preparer
        text = "CREATE "
        if index.unique:
            text += "UNIQUE "
        if index.dialect_options['oracle']['bitmap']:
            text += "BITMAP "
        text += "INDEX %s ON %s (%s)" % (
            self._prepared_index_name(index, include_schema=True),
            preparer.format_table(index.table, use_schema=True),
            ', '.join(
                self.sql_compiler.process(
                    expr,
                    include_table=False, literal_binds=True)
                for expr in index.expressions)
        )
        if index.dialect_options['oracle']['compress'] is not False:
            if index.dialect_options['oracle']['compress'] is True:
                text += " COMPRESS"
            else:
                text += " COMPRESS %d" % (
                    index.dialect_options['oracle']['compress']
                )
        return text
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def has_table(self, connection, table_name, schema=None):
        if not schema:
            schema = self.default_schema_name
        cursor = connection.execute(
            sql.text("SELECT table_name FROM all_tables "
                     "WHERE table_name = :name AND owner = :schema_name"),
            name=self.denormalize_name(table_name),
            schema_name=self.denormalize_name(schema))
        return cursor.first() is not None