Python gym 模块,spec() 实例源码

我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用gym.spec()

项目:DHP    作者:YuhangSong    | 项目源码 | 文件源码
def create_env(env_id, client_id, remotes, task=0, subject=None, summary_writer=None, **kwargs):
    import config
    if config.project is 'g':
        spec = gym.spec(env_id)
        if spec.tags.get('flashgames', False):
            return create_flash_env(env_id, client_id, remotes, **kwargs)
        elif spec.tags.get('atari', False) and spec.tags.get('vnc', False):
            return create_vncatari_env(env_id, client_id, remotes, **kwargs)
        else:
            # Assume atari.
            assert "." not in env_id  # universe environments have dots in names.
            return create_atari_env(env_id)
    elif config.project is 'f':
        return env_f(env_id = env_id,
                     task = task,
                     subject = subject,
                     summary_writer=summary_writer)
项目:universe    作者:openai    | 项目源码 | 文件源码
def configure(self, n=1, pool_size=None, episode_limit=None):
        self.n = n
        self.envs = [self.spec.make() for _ in range(self.n)]

        if pool_size is None:
            pool_size = min(len(self.envs), multiprocessing.cpu_count() - 1)
            pool_size = max(1, pool_size)

        self.worker_n = []
        m = int((self.n + pool_size - 1) / pool_size)
        for i in range(0, self.n, m):
            envs = self.envs[i:i+m]
            self.worker_n.append(Worker(envs, i))

        if episode_limit is not None:
            self._episode_id.episode_limit = episode_limit
项目:universe    作者:openai    | 项目源码 | 文件源码
def gym_core_action_space(gym_core_id):
    spec = gym.spec(gym_core_id)

    if spec.id == 'CartPole-v0':
        return spaces.Hardcoded([[spaces.KeyEvent.by_name('left', down=True)],
                                 [spaces.KeyEvent.by_name('left', down=False)]])
    elif spec._entry_point.startswith('gym.envs.atari:'):
        actions = []
        env = spec.make()
        for action in env.unwrapped.get_action_meanings():
            z = 'FIRE' in action
            left = 'LEFT' in action
            right = 'RIGHT' in action
            up = 'UP' in action
            down = 'DOWN' in action
            translated = atari_vnc(up=up, down=down, left=left, right=right, z=z)
            actions.append(translated)
        return spaces.Hardcoded(actions)
    else:
        raise error.Error('Unsupported env type: {}'.format(spec.id))
项目:universe    作者:openai    | 项目源码 | 文件源码
def __init__(self, env, gym_core_id=None):
        super(GymCoreAction, self).__init__(env)

        if gym_core_id is None:
            # self.spec is None while inside of the make, so we need
            # to pass gym_core_id in explicitly there. This case will
            # be hit when instantiating by hand.
            gym_core_id = self.spec._kwargs['gym_core_id']

        spec = gym.spec(gym_core_id)
        raw_action_space = gym_core_action_space(gym_core_id)

        self._actions = raw_action_space.actions
        self.action_space = gym_spaces.Discrete(len(self._actions))

        if spec._entry_point.startswith('gym.envs.atari:'):
            self.key_state = translator.AtariKeyState(gym.make(gym_core_id))
        else:
            self.key_state = None
项目:unreal-implementation    作者:404akhan    | 项目源码 | 文件源码
def make_env():
    env_spec = gym.spec('ppaquette/DoomBasic-v0')
    env_spec.id = 'DoomBasic-v0'
    env = env_spec.make()
    e = PreprocessImage(SkipWrapper(4)(ToDiscrete("minimal")(env)),
                                 width=80, height=80, grayscale=True)
    return e
项目:gym    作者:openai    | 项目源码 | 文件源码
def score_from_file(json_file):
    """Calculate score from an episode_batch.json file"""
    with open(json_file) as f:
        results = json.load(f)

    # No scores yet saved
    if results is None:
        return None

    episode_lengths = results['episode_lengths']
    episode_rewards = results['episode_rewards']
    episode_types = results['episode_types']
    timestamps = results['timestamps']
    initial_reset_timestamp = results['initial_reset_timestamp']
    spec = gym.spec(results['env_id'])

    return score_from_merged(episode_lengths, episode_rewards, episode_types, timestamps, initial_reset_timestamp, spec.trials, spec.reward_threshold)
项目:gym    作者:openai    | 项目源码 | 文件源码
def benchmark_score_from_local(benchmark_id, training_dir):
    spec = gym.benchmark_spec(benchmark_id)

    directories = []
    for name, _, files in os.walk(training_dir):
        manifests = gym.monitoring.detect_training_manifests(name, files=files)
        if manifests:
            directories.append(name)

    benchmark_results = defaultdict(list)
    for training_dir in directories:
        results = gym.monitoring.load_results(training_dir)

        env_id = results['env_info']['env_id']
        benchmark_result = spec.score_evaluation(env_id, results['data_sources'], results['initial_reset_timestamps'], results['episode_lengths'], results['episode_rewards'], results['episode_types'], results['timestamps'])
        # from pprint import pprint
        # pprint(benchmark_result)
        benchmark_results[env_id].append(benchmark_result)

    return gym.benchmarks.scoring.benchmark_aggregate_score(spec, benchmark_results)
项目:AI-Fight-the-Landlord    作者:YoungGer    | 项目源码 | 文件源码
def score_from_file(json_file):
    """Calculate score from an episode_batch.json file"""
    with open(json_file) as f:
        results = json.load(f)

    # No scores yet saved
    if results is None:
        return None

    episode_lengths = results['episode_lengths']
    episode_rewards = results['episode_rewards']
    episode_types = results['episode_types']
    timestamps = results['timestamps']
    initial_reset_timestamp = results['initial_reset_timestamp']
    spec = gym.spec(results['env_id'])

    return score_from_merged(episode_lengths, episode_rewards, episode_types, timestamps, initial_reset_timestamp, spec.trials, spec.reward_threshold)
项目:AI-Fight-the-Landlord    作者:YoungGer    | 项目源码 | 文件源码
def benchmark_score_from_local(benchmark_id, training_dir):
    spec = gym.benchmark_spec(benchmark_id)

    directories = []
    for name, _, files in os.walk(training_dir):
        manifests = gym.monitoring.detect_training_manifests(name, files=files)
        if manifests:
            directories.append(name)

    benchmark_results = defaultdict(list)
    for training_dir in directories:
        results = gym.monitoring.load_results(training_dir)

        env_id = results['env_info']['env_id']
        benchmark_result = spec.score_evaluation(env_id, results['data_sources'], results['initial_reset_timestamps'], results['episode_lengths'], results['episode_rewards'], results['episode_types'], results['timestamps'])
        # from pprint import pprint
        # pprint(benchmark_result)
        benchmark_results[env_id].append(benchmark_result)

    return gym.benchmarks.scoring.benchmark_aggregate_score(spec, benchmark_results)
项目:gym-adv    作者:lerrel    | 项目源码 | 文件源码
def benchmark_score_from_local(benchmark_id, training_dir):
    spec = gym.benchmark_spec(benchmark_id)

    directories = []
    for name, _, files in os.walk(training_dir):
        manifests = gym.monitoring.detect_training_manifests(name, files=files)
        if manifests:
            directories.append(name)

    benchmark_results = defaultdict(list)
    for training_dir in directories:
        results = gym.monitoring.load_results(training_dir)

        env_id = results['env_info']['env_id']
        benchmark_result = spec.score_evaluation(env_id, results['data_sources'], results['initial_reset_timestamps'], results['episode_lengths'], results['episode_rewards'], results['episode_types'], results['timestamps'])
        # from pprint import pprint
        # pprint(benchmark_result)
        benchmark_results[env_id].append(benchmark_result)

    return gym.benchmarks.scoring.benchmark_aggregate_score(spec, benchmark_results)
项目:DHP    作者:YuhangSong    | 项目源码 | 文件源码
def __init__(self, env_id, task, subject=None, summary_writer=None):

        self._episode_reward = 0
        self._episode_length = 0

        class nnn():
            def __init__(self, n):
                self.n = n
        import config
        self.action_space = nnn(config.direction_num)

        self.env_id = env_id
        import envs_li
        self.env_li = envs_li.env_li(env_id=env_id,
                                     task=task,
                                     subject=subject,
                                     summary_writer=summary_writer)

        '''observation_space'''
        from config import observation_space
        self.observation_space = observation_space

        '''warper to meet origin env'''

        '''env.spec.tags.get('wrapper_config.TimeLimit.max_episode_steps')'''

        class spec():
            def __init__(self, env_li):
                class tags():
                    def __init__(self, env_li):
                        self.env_li = env_li
                    def get(self,get_str):
                        if get_str=='wrapper_config.TimeLimit.max_episode_steps':
                            return self.env_li.step_total
                        else:
                            print(s)
                self.tags = tags(env_li)
        self.spec = spec(self.env_li)
项目:universe    作者:openai    | 项目源码 | 文件源码
def __init__(self, env_id):
        self.worker_n = None

        # Pull the relevant info from a transient env instance
        self.spec = gym.spec(env_id)
        env = self.spec.make()

        current_metadata = self.metadata
        self.metadata = env.metadata.copy()
        self.metadata.update(current_metadata)

        self.action_space = env.action_space
        self.observation_space = env.observation_space
        self.reward_range = env.reward_range
项目:universe    作者:openai    | 项目源码 | 文件源码
def __init__(self, env, gym_core_id=None):
        super(GymCoreObservation, self).__init__(env)

        if gym_core_id is None:
            # self.spec is None while inside of the make, so we need
            # to pass gym_core_id in explicitly there. This case will
            # be hit when instantiating by hand.
            gym_core_id = self.spec._kwargs['gym_core_id']

        self._reward_n = None
        self._done_n = None
        self._info_n = None

        self._gym_core_env = gym.spec(gym_core_id).make()
项目:universe    作者:openai    | 项目源码 | 文件源码
def WrappedGymCoreSyncEnv(gym_core_id, fps=60, rewarder_observation=False):
    spec = gym.spec(gym_core_id)
    env = gym_core_sync.GymCoreSync(BlockingReset(wrap(envs.VNCEnv(fps=fps))))
    if rewarder_observation:
        env = GymCoreObservation(env, gym_core_id=gym_core_id)
    elif spec._entry_point.startswith('gym.envs.atari:'):
        env = CropAtari(env)

    return env
项目:universe    作者:openai    | 项目源码 | 文件源码
def __init__(self, gym_core_id, fps=60, vnc_pixels=True):
        super(GymCoreSyncEnv, self).__init__(gym_core_id, fps=fps)
        # Metadata has already been cloned
        self.metadata['semantics.async'] = False

        self.gym_core_id = gym_core_id
        self.vnc_pixels = vnc_pixels

        if not vnc_pixels:
            self._core_env = gym.spec(gym_core_id).make()
        else:
            self._core_env = None
项目:universe    作者:openai    | 项目源码 | 文件源码
def test_nice_vnc_semantics_match(spec, matcher, wrapper):
    # Check that when running over VNC or using the raw environment,
    # semantics match exactly.
    gym.undo_logger_setup()
    logging.getLogger().setLevel(logging.INFO)

    spaces.seed(0)

    vnc_env = spec.make()
    if vnc_env.metadata.get('configure.required', False):
        vnc_env.configure(remotes=1)
    vnc_env = wrapper(vnc_env)
    vnc_env = wrappers.Unvectorize(vnc_env)

    env = gym.make(spec._kwargs['gym_core_id'])

    env.seed(0)
    vnc_env.seed(0)

    # Check that reset observations work
    reset(matcher, env, vnc_env, stage='initial reset')

    # Check a full rollout
    rollout(matcher, env, vnc_env, timestep_limit=50, stage='50 steps')

    # Reset to start a new episode
    reset(matcher, env, vnc_env, stage='reset to new episode')

    # Check that a step into the next episode works
    rollout(matcher, env, vnc_env, timestep_limit=1, stage='1 step in new episode')

    # Make sure env can be reseeded
    env.seed(1)
    vnc_env.seed(1)
    reset(matcher, env, vnc_env, 'reseeded reset')
    rollout(matcher, env, vnc_env, timestep_limit=1, stage='reseeded step')
项目:gym    作者:openai    | 项目源码 | 文件源码
def score_from_remote(url):
    result = requests.get(url)
    parsed = result.json()
    episode_lengths = parsed['episode_lengths']
    episode_rewards = parsed['episode_rewards']
    episode_types = parsed.get('episode_types')
    timestamps = parsed['timestamps']
    # Handle legacy entries where initial_reset_timestamp wasn't set
    initial_reset_timestamp = parsed.get('initial_reset_timestamp', timestamps[0])
    env_id = parsed['env_id']

    spec = gym.spec(env_id)
    return score_from_merged(episode_lengths, episode_rewards, episode_types, timestamps, initial_reset_timestamp, spec.trials, spec.reward_threshold)
项目:gym    作者:openai    | 项目源码 | 文件源码
def score_from_local(directory):
    """Calculate score from a local results directory"""
    results = gym.monitoring.load_results(directory)
    # No scores yet saved
    if results is None:
        return None

    episode_lengths = results['episode_lengths']
    episode_rewards = results['episode_rewards']
    episode_types = results['episode_types']
    timestamps = results['timestamps']
    initial_reset_timestamp = results['initial_reset_timestamp']
    spec = gym.spec(results['env_info']['env_id'])

    return score_from_merged(episode_lengths, episode_rewards, episode_types, timestamps, initial_reset_timestamp, spec.trials, spec.reward_threshold)
项目:trpo    作者:jjkke88    | 项目源码 | 文件源码
def __init__(self, args, task_q, result_q, actor_id, monitor):
        multiprocessing.Process.__init__(self)
        self.actor_id = actor_id
        self.task_q = task_q
        self.result_q = result_q
        self.args = args
        self.monitor = monitor
        # pms.max_path_length = gym.spec(args.environment_name).timestep_limit
项目:trpo    作者:jjkke88    | 项目源码 | 文件源码
def __init__(self, args, task_q, result_q, actor_id, monitor):
        multiprocessing.Process.__init__(self)
        self.actor_id = actor_id
        self.task_q = task_q
        self.result_q = result_q
        self.args = args
        self.monitor = monitor
        # pms.max_path_length = gym.spec(args.environment_name).timestep_limit
项目:FeatureControlHRL    作者:Nat-D    | 项目源码 | 文件源码
def create_env(env_id, client_id, remotes, **kwargs):
    spec = gym.spec(env_id)

    if spec.tags.get('flashgames', False):
        return create_flash_env(env_id, client_id, remotes, **kwargs)
    elif spec.tags.get('atari', False) and spec.tags.get('vnc', False):
        return create_vncatari_env(env_id, client_id, remotes, **kwargs)
    else:
        # Assume atari.
        assert "." not in env_id  # universe environments have dots in names.
        return create_atari_env(env_id)
项目:noreward-rl    作者:pathak22    | 项目源码 | 文件源码
def create_env(env_id, client_id, remotes, **kwargs):
    if 'doom' in env_id.lower() or 'labyrinth' in env_id.lower():
        return create_doom(env_id, client_id, **kwargs)
    if 'mario' in env_id.lower():
        return create_mario(env_id, client_id, **kwargs)

    spec = gym.spec(env_id)
    if spec.tags.get('flashgames', False):
        return create_flash_env(env_id, client_id, remotes, **kwargs)
    elif spec.tags.get('atari', False) and spec.tags.get('vnc', False):
        return create_vncatari_env(env_id, client_id, remotes, **kwargs)
    else:
        # Assume atari.
        assert "." not in env_id  # universe environments have dots in names.
        return create_atari_env(env_id, **kwargs)
项目:universe-starter-agent    作者:openai    | 项目源码 | 文件源码
def create_env(env_id, client_id, remotes, **kwargs):
    spec = gym.spec(env_id)

    if spec.tags.get('flashgames', False):
        return create_flash_env(env_id, client_id, remotes, **kwargs)
    elif spec.tags.get('atari', False) and spec.tags.get('vnc', False):
        return create_vncatari_env(env_id, client_id, remotes, **kwargs)
    else:
        # Assume atari.
        assert "." not in env_id  # universe environments have dots in names.
        return create_atari_env(env_id)
项目:AI-Fight-the-Landlord    作者:YoungGer    | 项目源码 | 文件源码
def score_from_remote(url):
    result = requests.get(url)
    parsed = result.json()
    episode_lengths = parsed['episode_lengths']
    episode_rewards = parsed['episode_rewards']
    episode_types = parsed.get('episode_types')
    timestamps = parsed['timestamps']
    # Handle legacy entries where initial_reset_timestamp wasn't set
    initial_reset_timestamp = parsed.get('initial_reset_timestamp', timestamps[0])
    env_id = parsed['env_id']

    spec = gym.spec(env_id)
    return score_from_merged(episode_lengths, episode_rewards, episode_types, timestamps, initial_reset_timestamp, spec.trials, spec.reward_threshold)
项目:AI-Fight-the-Landlord    作者:YoungGer    | 项目源码 | 文件源码
def score_from_local(directory):
    """Calculate score from a local results directory"""
    results = gym.monitoring.load_results(directory)
    # No scores yet saved
    if results is None:
        return None

    episode_lengths = results['episode_lengths']
    episode_rewards = results['episode_rewards']
    episode_types = results['episode_types']
    timestamps = results['timestamps']
    initial_reset_timestamp = results['initial_reset_timestamp']
    spec = gym.spec(results['env_info']['env_id'])

    return score_from_merged(episode_lengths, episode_rewards, episode_types, timestamps, initial_reset_timestamp, spec.trials, spec.reward_threshold)
项目:a3c-tensorflow    作者:carpedm20    | 项目源码 | 文件源码
def create_env(env_id, client_id, **kwargs):
  spec = gym.spec(env_id)

  # Assume atari.
  assert "." not in env_id  # universe environments have dots in names.
  return create_atari_env(env_id)
项目:gym-adv    作者:lerrel    | 项目源码 | 文件源码
def score_from_remote(url):
    result = requests.get(url)
    parsed = result.json()
    episode_lengths = parsed['episode_lengths']
    episode_rewards = parsed['episode_rewards']
    episode_types = parsed.get('episode_types')
    timestamps = parsed['timestamps']
    # Handle legacy entries where initial_reset_timestamp wasn't set
    initial_reset_timestamp = parsed.get('initial_reset_timestamp', timestamps[0])
    env_id = parsed['env_id']

    spec = gym.spec(env_id)
    return score_from_merged(episode_lengths, episode_rewards, episode_types, timestamps, initial_reset_timestamp, spec.trials, spec.reward_threshold)
项目:gym-adv    作者:lerrel    | 项目源码 | 文件源码
def score_from_local(directory):
    """Calculate score from a local results directory"""
    results = gym.monitoring.monitor.load_results(directory)
    # No scores yet saved
    if results is None:
        return None

    episode_lengths = results['episode_lengths']
    episode_rewards = results['episode_rewards']
    episode_types = results['episode_types']
    timestamps = results['timestamps']
    initial_reset_timestamp = results['initial_reset_timestamp']
    spec = gym.spec(results['env_info']['env_id'])

    return score_from_merged(episode_lengths, episode_rewards, episode_types, timestamps, initial_reset_timestamp, spec.trials, spec.reward_threshold)
项目:feudal_networks    作者:dmakian    | 项目源码 | 文件源码
def create_env(env_id, client_id, remotes, **kwargs):
    spec = gym.spec(env_id)

    if spec.tags.get('feudal', False):
        return create_feudal_env(env_id, client_id, remotes, **kwargs)
    elif spec.tags.get('flashgames', False):
        return create_flash_env(env_id, client_id, remotes, **kwargs)
    elif spec.tags.get('atari', False) and spec.tags.get('vnc', False):
        return create_vncatari_env(env_id, client_id, remotes, **kwargs)
    else:
        # Assume atari.
        assert "." not in env_id  # universe environments have dots in names.
        return create_atari_env(env_id)
项目:RL-Universe    作者:Bifrost-Research    | 项目源码 | 文件源码
def create_env(env_id, client_id, remotes, **kwargs):
    spec = gym.spec(env_id)

    if spec.tags.get('flashgames', False):
        return create_flash_env(env_id, client_id, remotes, **kwargs)
    elif spec.tags.get('atari', False) and spec.tags.get('vnc', False):
        return create_vncatari_env(env_id, client_id, remotes, **kwargs)
    else:
        # Assume atari.
        assert "." not in env_id  # universe environments have dots in names.
        return create_atari_env(env_id)