Python psutil 模块,NUM_CPUS 实例源码

我们从Python开源项目中,提取了以下4个代码示例,用于说明如何使用psutil.NUM_CPUS

项目:charm-rabbitmq-server    作者:openstack    | 项目源码 | 文件源码
def calculate_threads(self):
        """
        Determine the number of erl vm threads in pool based in cpu resources
        available.

        Number of threads will be limited to MAX_DEFAULT_WORKERS in
        container environments where no worker-multipler configuration
        option been set.

        @returns int: number of io threads to allocate
        """

        try:
            num_cpus = psutil.cpu_count()
        except AttributeError:
            num_cpus = psutil.NUM_CPUS

        multiplier = (config('erl-vm-io-thread-multiplier') or
                      DEFAULT_MULTIPLIER)

        log("Calculating erl vm io thread pool size based on num_cpus={} and "
            "multiplier={}".format(num_cpus, multiplier), DEBUG)

        count = int(num_cpus * multiplier)
        if multiplier > 0 and count == 0:
            count = 1

        if config('erl-vm-io-thread-multiplier') is None and is_container():
            # NOTE(hopem): Limit unconfigured erl-vm-io-thread-multiplier
            #              to MAX_DEFAULT_THREADS to avoid insane pool
            #              configuration in LXD containers on large servers.
            count = min(count, MAX_DEFAULT_THREADS)

        log("erl vm io thread pool size = {} (capped={})"
            .format(count, is_container()), DEBUG)

        return count
项目:IoTcube-HMark-cli    作者:squizz617    | 项目源码 | 文件源码
def get_cpu_count():
    try:
        import multiprocessing
        return multiprocessing.cpu_count()
    except (ImportError, NotImplementedError):
        pass

    # http://code.google.com/p/psutil/
    try:
        import psutil
        return psutil.cpu_count() #psutil.NUM_CPUS on old versions
    except (ImportError, AttributeError):
        pass

    return 1
项目:ave    作者:sonyxperiadev    | 项目源码 | 文件源码
def t15(factory):
    pretty = '%s t15' % __file__
    print(pretty)

    ws_config = {
        'root'   : factory.HOME.path,
        'env'    : [],
        'tools'  : {},
        'flocker': {
            "ftp": {
                "password": "ftpuser",
                "store": "/srv/www/flocker",
                "port": 21,
                "timeout": 30,
                "user": "ftpuser"
            },
            "host": "cnbjlx20050",
            "enable" : True,
            "http": {
                "doc-root": "/srv/www",
                "port": 80
            }
        }
    }
    factory.write_config('workspace.json', json.dumps(ws_config))

    sock, port = find_free_port()
    # set remote explicitly to avoid reading config from disk
    broker = Broker(
        ('',port), sock, remote={}, authkeys={'admin':None}, hsl_paths=[],
        home=factory.HOME.path
    )
    broker.start()
    proc   = psutil.Process(broker.pid)
    remote = RemoteBroker(address=('',port), home=factory.HOME.path)

    l = remote.list_available() # just to make sure the connection is up
    del remote                  # client disconnects itself

    # check the CPU utilization of the broker through it's PID
    result = True
    for i in range(10):
        if 'get_cpu_percent' in dir(proc):
            load = proc.get_cpu_percent() * psutil.NUM_CPUS
        else:
            load = proc.cpu_percent() * psutil.cpu_count()
        if load > 90.0:
            print('FAIL %s: runaway CPU load: %f' % (pretty, load))
            result = False
            break
        time.sleep(0.3)

    broker.terminate()
    broker.join()

    return result

# check that keeping the broker busy doesn't cause dropped connections. this is
# a regression test against a bug that involved the POLLNVAL error condition on
# polling objects.
项目:ave    作者:sonyxperiadev    | 项目源码 | 文件源码
def t15(factory):
    pretty = '%s t15' % __file__
    print(pretty)

    ws_config = {
        'root'   : factory.HOME.path,
        'env'    : [],
        'tools'  : {},
        'flocker': {
            "ftp": {
                "password": "ftpuser",
                "store": "/srv/www/flocker",
                "port": 21,
                "timeout": 30,
                "user": "ftpuser"
            },
            "host": "cnbjlx20050",
            "enable" : True,
            "http": {
                "doc-root": "/srv/www",
                "port": 80
            }
        }
    }
    factory.write_config('workspace.json', json.dumps(ws_config))

    sock, port = find_free_port()
    # set remote explicitly to avoid reading config from disk
    broker = Broker(
        ('',port), sock, remote={}, authkeys={'admin':None}, hsl_paths=[],
        home=factory.HOME.path
    )
    broker.start()
    proc   = psutil.Process(broker.pid)
    remote = RemoteBroker(address=('',port), home=factory.HOME.path)

    l = remote.list_available() # just to make sure the connection is up
    del remote                  # client disconnects itself

    # check the CPU utilization of the broker through it's PID
    result = True
    for i in range(10):
        if 'get_cpu_percent' in dir(proc):
            load = proc.get_cpu_percent() * psutil.NUM_CPUS
        else:
            load = proc.cpu_percent() * psutil.cpu_count()
        if load > 90.0:
            print('FAIL %s: runaway CPU load: %f' % (pretty, load))
            result = False
            break
        time.sleep(0.3)

    broker.terminate()
    broker.join()

    return result

# check that keeping the broker busy doesn't cause dropped connections. this is
# a regression test against a bug that involved the POLLNVAL error condition on
# polling objects.