Python psycopg2.extras 模块,RealDictCursor() 实例源码

我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用psycopg2.extras.RealDictCursor()

项目:postpy    作者:portfoliome    | 项目源码 | 文件源码
def select_dict(conn, query: str, params=None, name=None, itersize=5000):
    """Return a select statement's results as dictionary.

    Parameters
    ----------
    conn : database connection
    query : select query string
    params : query parameters.
    name : server side cursor name. defaults to client side.
    itersize : number of records fetched by server.
    """

    with conn.cursor(name, cursor_factory=RealDictCursor) as cursor:
        cursor.itersize = itersize
        cursor.execute(query, params)

        for result in cursor:
            yield result
项目:knp-utils-py    作者:Kensuke-Mitsuzawa    | 项目源码 | 文件源码
def get_task_status(self, task_id):
        """"""
        # type: (str)->Union[str,Dict[str,Any]]

        sql_update = "SELECT task_status, description updated_at FROM {} WHERE task_id=%s".format(self.table_task_status)
        cur = self.db_connection.cursor(cursor_factory=RealDictCursor)  # type: cursor

        try:
            cur.execute(sql_update, (task_id, ))
            tasK_status_record = cur.fetchone()
        except:
            traceback_message = traceback.format_exc()
            return traceback_message
        else:
            cur.close()
            return tasK_status_record
项目:musicbot    作者:AdrienPensart    | 项目源码 | 文件源码
def get_music(artist, album, title):
    '''Get a track tags or download it'''
    page_format = request.args.get('format', 'html')
    artist = unquote(artist)
    album = unquote(album)
    title = unquote(title)
    collection = app.config['COLLECTION']
    mf = MusicFilter(artists=[artist], albums=[album], titles=[title])
    musics = webfilter(partial(collection.filter, cursor_factory=RealDictCursor), mf)
    if len(musics) != 1:
        return ('Music not found', 404)
    music = musics[0]

    if page_format == 'html':
        return render_template("music.html", music=music)
    elif page_format == 'json':
        return dumps(music, sort_keys=True, indent=4, separators=(',', ': '))
    return ('Invalid format, available: json,html', 400)
项目:nmbs-realtime-feed    作者:datamindedbe    | 项目源码 | 文件源码
def remove_redundant_delays(conn, delays):
    if len(delays.values()) == 0:
        return delays  # no new delays to add
    ids = delays.keys()
    cur = conn.cursor(cursor_factory=RealDictCursor)
    query = """ SELECT * FROM realtime_updates WHERE id IN %(ids)s """
    cur.execute(query, {'ids': tuple(ids)})
    results = cur.fetchall()
    cur.close()
    existing_delays = {get_delay_id(r): r for r in results}
    new_delays = {}
    for key, delay in delays.iteritems():
        # if stop already registered in datbase
        if key in existing_delays:
            current = existing_delays[key]
            # and the file we're reading now is newer than the file in the database
            if current['s3_path'] < delay['s3_path']:
                # and the timings is different
                if current['arrival_delay'] != delay['arrival_delay'] \
                        or current['departure_delay'] != delay['departure_delay']:
                    # then store the change
                    new_delays[key] = delay
        else:
            new_delays[key] = delay
    return new_delays
项目:otree_virtual_machine_manager    作者:tobiasraabe    | 项目源码 | 文件源码
def get_user(user_name: str):
        """Returns a user entry from PostgreSQL table.

        Returns
        -------
        dict_user : dict
            A dictionary with user information

        """

        with psycopg2.connect(cursor_factory=RealDictCursor,
                              **PSQL_CONN) as conn:
            dict_cur = conn.cursor()
            dict_cur.execute(
                """SELECT * FROM {} WHERE user_name = '{}';"""
                    .format(PSQL_TABLE, user_name))
            dict_user = dict_cur.fetchone()
        conn.close()

        return dict_user
项目:igcollect    作者:innogames    | 项目源码 | 文件源码
def execute(conn, query, query_vars=()):
    """Execute given query and return fetched results"""
    with conn.cursor(cursor_factory=RealDictCursor) as cursor:
        cursor.execute(query, query_vars)
        return cursor.fetchall()
项目:igcollect    作者:innogames    | 项目源码 | 文件源码
def execute(conn, query):
    """Execute given query and return fetched results"""
    with conn.cursor(cursor_factory=RealDictCursor) as cursor:
        cursor.execute(query)

        return cursor.fetchall()
项目:igcollect    作者:innogames    | 项目源码 | 文件源码
def connect_and_execute(query, database='postgres'):
    """Connect to database, execute given query and return fetched results"""

    conn = connect(database=database)

    try:
        conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
        cursor = conn.cursor(cursor_factory=RealDictCursor)
        cursor.execute(query)

        return cursor.fetchall()
    finally:
        conn.close()
项目:knp-utils-py    作者:Kensuke-Mitsuzawa    | 项目源码 | 文件源码
def get_processed_documents(self, task_id):
        """"""
        # type: (str)->Dict[str,Any]
        sql_update = "SELECT text_id,sentence_index,task_id,knp_result,status FROM {} WHERE task_id=%s".format(self.table_knp_result)
        cur = self.db_connection.cursor(cursor_factory=RealDictCursor)  # type: cursor

        try:
            cur.execute(sql_update, (task_id, ))
            task_status_record = cur.fetchall()
        except:
            traceback_message = traceback.format_exc()
            return traceback_message
        else:
            cur.close()
            return task_status_record
项目:musicbot    作者:AdrienPensart    | 项目源码 | 文件源码
def get_stats():
    '''Music library statistics'''
    page_format = request.args.get('format', 'html')
    collection = app.config['COLLECTION']
    stats = webfilter(partial(collection.stats, cursor_factory=RealDictCursor))
    def bytesToHuman(b):
        return humanfriendly.format_size(b)
    def secondsToHuman(s):
        import datetime
        return str(datetime.timedelta(seconds=s))
    if page_format == 'html':
        return render_template("stats.html", stats=stats, bytesToHuman=bytesToHuman, secondsToHuman=secondsToHuman)
    elif page_format == 'json':
        return dumps(stats)
    return ('Invalid format, available: json,html', 400)
项目:musicbot    作者:AdrienPensart    | 项目源码 | 文件源码
def test_stats(self):
        stats = self.collection.stats(cursor_factory=RealDictCursor)
        self.assertEqual(stats, teststats)
项目:musicbot    作者:AdrienPensart    | 项目源码 | 文件源码
def test_filtered_stats(self):
        mf = lib.MusicFilter()
        mf.keywords = ['rock']
        stats = self.collection.stats(mf, cursor_factory=RealDictCursor)
        self.assertEqual(stats, filtered_teststats)
项目:netcrawl    作者:Wyko    | 项目源码 | 文件源码
def get_next(self):
        '''Gets the next pending device.

        Returns: 
            Dict: The next pending device as a dictionary object
                with the names of the rows as keys.
        '''
        proc = 'main_db.get_next'

        # User a special cursor which returns results as dicts
        with self.conn, self.conn.cursor(cursor_factory=RealDictCursor) as cur:        
            cur.execute('''
                SELECT * FROM 
                    pending 
                WHERE 
                    working= FALSE
                ORDER BY pending_id ASC LIMIT 1
                ''')
            output = cur.fetchone()


            # Mark the new entry as being worked on 
            if output:
                cur.execute('''
                    UPDATE pending 
                    SET working= TRUE
                    WHERE pending_id= %s
                    ''',
                    (output['pending_id'],))

                # Return the next device
                output = dict(output)
                return output

            else: return None
项目:directory-tests    作者:uktrade    | 项目源码 | 文件源码
def get_dir_db_connection(*, dict_cursor: bool = False):
    try:
        connection = psycopg2.connect(
            dbname=DIR_DB_NAME, user=DIR_DB_USER, password=DIR_DB_PASSWORD,
            host=DIR_DB_HOST, port=DIR_DB_PORT)
    except psycopg2.OperationalError as e:
        logging.error('Unable to connect to Directory DB!\n%s', e)
        raise
    else:
        logging.debug('Connected to Directory DB: %s!', DIR_DB_NAME)
    if dict_cursor:
        cursor = connection.cursor(cursor_factory=RealDictCursor)
    else:
        cursor = connection.cursor()
    return connection, cursor
项目:directory-tests    作者:uktrade    | 项目源码 | 文件源码
def get_sso_db_connection(*, dict_cursor: bool = False):
    try:
        connection = psycopg2.connect(
            dbname=SSO_DB_NAME, user=SSO_DB_USER, password=SSO_DB_PASSWORD,
            host=SSO_DB_HOST, port=SSO_DB_PORT)
    except psycopg2.OperationalError as e:
        logging.error('Unable to connect to SSO DB!\n%s', e)
        raise
    else:
        logging.debug('Connected to Directory DB: %s!', DIR_DB_NAME)
    if dict_cursor:
        cursor = connection.cursor(cursor_factory=RealDictCursor)
    else:
        cursor = connection.cursor()
    return connection, cursor
项目:directory-tests    作者:uktrade    | 项目源码 | 文件源码
def get_sso_db_connection(*, dict_cursor: bool = False):
    try:
        connection = psycopg2.connect(
            dbname=SSO_DB_NAME, user=SSO_DB_USER, password=SSO_DB_PASSWORD,
            host=SSO_DB_HOST, port=SSO_DB_PORT)
    except psycopg2.OperationalError as e:
        logging.error('Unable to connect to SSO DB!\n%s', e)
        raise
    if dict_cursor:
        cursor = connection.cursor(cursor_factory=RealDictCursor)
    else:
        cursor = connection.cursor()
    return connection, cursor
项目:lang2program    作者:kelvinguu    | 项目源码 | 文件源码
def query_cursor(self, q, lazy_fetch=False, commit=True):
        """Execute a query and yield a cursor.

        All execution performed by the Postgres object uses this method.

        Args:
            q (str): SQL query
            lazy_fetch (bool): whether to use a server-side cursor (lazily fetches results).
        """
        self.cursors_opened += 1

        if self.verbose:
            logging.debug(q)

        if self.debug:
            empty_cursor = Bunch()
            empty_cursor.fetchmany = lambda size: []
            empty_cursor.fetchall = lambda: []
            yield empty_cursor
            return

        cursor_name = 'server_side_{}'.format(self.cursors_opened) if lazy_fetch else None
        with self.connection.cursor(cursor_name, cursor_factory=RealDictCursor) as cursor:
            cursor.execute(q)
            yield cursor

        if commit:
            self.commit()
项目:lang2program    作者:kelvinguu    | 项目源码 | 文件源码
def query_cursor(self, q, lazy_fetch=False, commit=True):
        """Execute a query and yield a cursor.

        All execution performed by the Postgres object uses this method.

        Args:
            q (str): SQL query
            lazy_fetch (bool): whether to use a server-side cursor (lazily fetches results).
        """
        self.cursors_opened += 1

        if self.verbose:
            logging.debug(q)

        if self.debug:
            empty_cursor = Bunch()
            empty_cursor.fetchmany = lambda size: []
            empty_cursor.fetchall = lambda: []
            yield empty_cursor
            return

        cursor_name = 'server_side_{}'.format(self.cursors_opened) if lazy_fetch else None
        with self.connection.cursor(cursor_name, cursor_factory=RealDictCursor) as cursor:
            cursor.execute(q)
            yield cursor

        if commit:
            self.commit()
项目:APITaxi_front    作者:openmaraude    | 项目源码 | 文件源码
def zupc_show_temp():
    cur = current_app.extensions['sqlalchemy'].db.session.connection().\
            connection.cursor(cursor_factory=RealDictCursor)
    cur.execute("""SELECT id, nom, insee FROM zupc_temp 
                   WHERE multiple=true AND parent_id = id;""")

    return render_template("zupc_show_temp.html",
                          list_zupc=cur.fetchall(),
                          apikey=current_user.apikey,
                          mapbox_token=current_app.config['MAPBOX_TOKEN'])
项目:expenses-diary-api    作者:otrabalhador    | 项目源码 | 文件源码
def execute_to_json(conn, query, params=None):
    with conn.cursor(cursor_factory=RealDictCursor) as cursor:
        cursor.execute(query, params)

        results = []
        for row in cursor.fetchall():
            row = StringConverter().snake_case_to_camel_case(row)
            results.append(dict(zip(row.keys(), row.values())))

        return results
项目:pgreaper    作者:vincentlaucsb    | 项目源码 | 文件源码
def read_pg(sql, conn=None, **kwargs):
    ''' Read a SQL query and return it as a Table '''

    cur = conn.cursor(cursor_factory=extras.RealDictCursor)
    cur.execute(sql)

    # Error occurs if a function is used in SQL query
    # and column name is not explictly provided
    new_table = Table(name='SQL Query', dialect='postgres')
    new_table.add_dicts(list(cur))

    return new_table