我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用psycopg2.pool()。
def __init__(self, dbname: str, isolation: str = "auto-commit"): # Establishes the connection with the backend databases. isolation_level = self.isolation_opts.get(isolation, psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) try: if dbname == "YOUR_DATABASE_NAME": self.__conn = self.__pool.getconn() self.__conn.set_isolation_level(isolation_level) else: self.__pool = None self.__conn = None except psycopg2.pool.PoolError as e: self.__pool = None self.__conn = None raise(e) except Exception as e: tb.print_exc() raise(e)
def get_db_connection(): """ psycopg2 connection context manager. Fetch a connection from the connection pool and release it. """ try: connection = pool.getconn() yield connection finally: pool.putconn(connection)
def __init__(self, host, database, user, autocommit=True, pool=PostgresPool, default_schema='public'): """ Instantiate class from given SourceDb entry (one row in the datasource table) :param host: Host address (domain or IP) :param database: Name of the database :param user: User """ self.user = user self.hostname = host self.dbname = database # the password is taken from ~/.pgpass if self.dbname not in self.connection_pools: self.connection_pools[self.dbname] = pool(autocommit=autocommit, minconn=1, maxconn=5, user=self.user, host=self.hostname, database=self.dbname, default_schema=default_schema, connect_timeout=config.DB_CONNECT_TIMEOUT)
def _connect(self): if self.application_name is None: st = inspect.stack() self.application_name = os.path.basename(st[-1][1]) conn_args = dict( host=self.url.hostname, port=self.url.port, database=self.url.database, user=self.url.username, password=self.url.password, application_name=self.application_name + "/" + hgvs.__version__, ) if self.pooling: _logger.info("Using UTA ThreadedConnectionPool") self._pool = psycopg2.pool.ThreadedConnectionPool(hgvs.global_config.uta.pool_min, hgvs.global_config.uta.pool_max, **conn_args) else: self._conn = psycopg2.connect(**conn_args) self._conn.autocommit = True self._ensure_schema_exists() # remap sqlite's ? placeholders to psycopg2's %s self._queries = {k: v.replace('?', '%s') for k, v in six.iteritems(self._queries)}
def __del__(self): """ implicitly called when garbage collections comes around to clean up objects. the database connection is returned to the pool """ if self.__pool and self.__conn: self.__pool.putconn(self.__conn) self.__pool = None self.__conn = None
def disconnect(self): """ close the connection. it doesn't return to the pool """ if self.__pool and self.__conn: self.__pool.putconn(self.__conn, close=True) self.__pool = None self.__conn = None
def getGavoDBConn(): if (g_db_pool): return g_db_pool.getconn() else: raise Exception('connection pool is None when get conn')
def putGavoDBConn(conn): if (g_db_pool): g_db_pool.putconn(conn) else: raise Exception('connection pool is None when put conn')
def close(self): if self.pooling: _logger.warning("Closing pool; future mapping and validation will fail.") self._pool.closeall() else: _logger.warning("Closing connection; future mapping and validation will fail.") if self._conn is not None: self._conn.close()
def connection_pool(max_conn, dsn_dict: Dict[str, str], application_name=psycopg2.__name__): """ Create a connection pool (with up to max_conn connections), where all connections will use the given connection string. """ dsn_values = _dsn_connection_values(dsn_dict, application_name) return psycopg2.pool.ThreadedConnectionPool(1, max_conn, **dsn_values)