Python gym.spaces 模块,MultiDiscrete() 实例源码

我们从Python开源项目中,提取了以下23个代码示例,用于说明如何使用gym.spaces.MultiDiscrete()

项目:FFXIV-DPS    作者:TheVorpalMethod    | 项目源码 | 文件源码
def __init__(self, maxUmbralAstral):
        # Print debug
        self.debug = False

        # Outer bound for Astral Fire and Umbral Ice
        BLM.MAXUMBRALASTRAL = maxUmbralAstral

        # Available buffs
        self.BUFFS = []

        # Maximum time available
        self.MAXTIME = 45

        self.HELPER = BLM.Helper()

        # Available abilities
        self.ABILITIES = [
            BLM.Ability("Blizzard 1", 180, 6,  2.5,  2.49, self.HELPER.UmbralIceIncrease, BLM.DamageType.Ice, self.HELPER), #480
            BLM.Ability("Fire 1",     180, 15, 2.5,  2.49, self.HELPER.AstralFireIncrease, BLM.DamageType.Fire, self.HELPER), #1200
            BLM.Ability("Transpose",  0,   0,  0.75, 12.9, self.HELPER.SwapAstralUmbral, BLM.DamageType.Neither, self.HELPER),
            BLM.Ability("Fire 3",     240, 30, 3.5,  2.5, self.HELPER.AstralFireMax, BLM.DamageType.Fire, self.HELPER), #2400
            BLM.Ability("Blizzard 3", 240, 18, 3.5,  2.5, self.HELPER.UmbralIceMax, BLM.DamageType.Ice, self.HELPER), #2400
            BLM.Ability("Fire 4",     260, 15, 2.8,  2.5, None, BLM.DamageType.Fire, self.HELPER)] #2400

        # State including ability cooldowns, buff time remaining, mana, and Astral/Umbral
        self.initialState = np.array([0] * (len(self.ABILITIES) + len(self.BUFFS)) + [BLM.MAXMANA] + [0])

        self.state = self._reset()

        # What the learner can pick between
        self.action_space = spaces.Discrete(len(self.ABILITIES))

        # What the learner can see to make a choice (cooldowns and buffs)
        self.observation_space = spaces.MultiDiscrete([[0,180]] * (len(self.ABILITIES) + len(self.BUFFS)) + [[0, BLM.MAXMANA]] + [[-3,3]])
项目:space-wrappers    作者:ngc92    | 项目源码 | 文件源码
def is_compound(space):
    """ Checks whether a space is a compound space. These are non-scalar
        `Box` spaces, `MultiDiscrete`, `MultiBinary` and `Tuple` spaces
        (A Tuple space with a single, non-compound subspace is still considered
        compound).
        :raises TypeError: If the space is no `gym.Space`.
    """
    assert_space(space)

    if isinstance(space, spaces.Discrete):
        return False
    elif isinstance(space, spaces.Box):
        return len(space.shape) != 1 or space.shape[0] != 1
    elif isinstance(space, (spaces.MultiDiscrete, spaces.MultiBinary)):
        return True
    elif isinstance(space, spaces.Tuple):
        return True

    raise NotImplementedError("Unknown space {} of type {} supplied".format(space, type(space)))
项目:space-wrappers    作者:ngc92    | 项目源码 | 文件源码
def num_discrete_actions(space):
    """
    For a discrete space, gets the number of available actions as a tuple.
    :param gym.Space space: The discrete space which to inspect.
    :return tuple: Tuple of integers containing the number of discrete actions.
    :raises TypeError: If the space is no `gym.Space`.
    """
    assert_space(space)

    if not is_discrete(space):
        raise TypeError("Space {} is not discrete".format(space))

    if isinstance(space, spaces.Discrete):
        return tuple((space.n,))
    elif isinstance(space, spaces.MultiDiscrete):
        # add +1 here as space.high is an inclusive bound
        return tuple(space.high - space.low + 1)
    elif isinstance(space, spaces.MultiBinary):
        return (2,) * space.n

    raise NotImplementedError("Unknown space {} of type {} supplied".format(space, type(space)))  # pragma: no cover
项目:reinforceflow    作者:dbobrenko    | 项目源码 | 文件源码
def __init__(self, env):
        if isinstance(env, six.string_types):
            env = gym.make(env)
        super(GymWrapper, self).__init__(env)
        if isinstance(env.action_space, spaces.MultiDiscrete):
            raise ValueError("Gym environments with MultiDiscrete spaces aren't supported yet.")
        self.observation_space = _to_rf_space(self.env.observation_space)
        self.action_space = _to_rf_space(self.env.action_space)
        self._obs_to_rf = _make_gym2rf_converter(self.observation_space)
        self._action_to_rf = _make_gym2rf_converter(self.action_space)
        self._action_to_gym = _make_rf2gym_converter(self.action_space)
        seed = reinforceflow.get_random_seed()
        if seed and hasattr(self.env, 'seed'):
            self.env.seed(seed)
项目:reinforceflow    作者:dbobrenko    | 项目源码 | 文件源码
def _make_rf2gym_converter(space):
    """Makes space converter function that maps space samples ReinforceFlow -> Gym."""
    # TODO: add spaces.MultiDiscrete support.
    if isinstance(space, spaces.Discrete):
        def converter(sample):
            return np.argmax(sample)
        return converter

    if isinstance(space, spaces.MultiBinary):
        def converter(sample):
            return tuple([np.argmax(s) for s in sample])
        return converter

    if isinstance(space, spaces.Box):
        return lambda sample: sample

    if isinstance(space, spaces.Tuple):
        sub_converters = []
        for sub_space in space.spaces:
            sub_converters.append(_make_rf2gym_converter(sub_space))

        def converter(sample):
            converted_tuple = []
            for sub_sample, sub_converter in zip(sample, sub_converters):
                converted_tuple.append(sub_converter(sub_sample))
            return tuple(converted_tuple)
        return converter
    raise ValueError("Unsupported space %s." % space)
项目:gym-minecraft    作者:tambetm    | 项目源码 | 文件源码
def _take_action(self, actions):
        # if there is only one action space, it wasn't wrapped in Tuple
        if len(self.action_spaces) == 1:
            actions = [actions]

        # send appropriate command for different actions
        for spc, cmds, acts in zip(self.action_spaces, self.action_names, actions):
            if isinstance(spc, spaces.Discrete):
                logger.debug(cmds[acts])
                self.agent_host.sendCommand(cmds[acts])
            elif isinstance(spc, spaces.Box):
                for cmd, val in zip(cmds, acts):
                    logger.debug(cmd + " " + str(val))
                    self.agent_host.sendCommand(cmd + " " + str(val))
            elif isinstance(spc, spaces.MultiDiscrete):
                for cmd, val in zip(cmds, acts):
                    logger.debug(cmd + " " + str(val))
                    self.agent_host.sendCommand(cmd + " " + str(val))
            else:
                logger.warn("Unknown action space for %s, ignoring." % cmds)
项目:mushroom    作者:carloderamo    | 项目源码 | 文件源码
def __init__(self, name, horizon, gamma):
        """
        Constructor.

        Args:
             name (str): gym id of the environment;
             horizon (int): horizon of the MDP;
             horizon (int): the horizon;
             gamma (float): the discount factor.

        """
        self.__name__ = name

        # MPD creation
        self.env = gym.make(self.__name__)

        self.env._max_episode_steps = np.inf  # Hack to ignore gym time limit.

        # MDP properties
        assert not isinstance(self.env.observation_space,
                              gym_spaces.MultiDiscrete)
        assert not isinstance(self.env.action_space, gym_spaces.MultiDiscrete)

        action_space = self._convert_gym_space(self.env.action_space)
        observation_space = self._convert_gym_space(self.env.observation_space)
        mdp_info = MDPInfo(observation_space, action_space, gamma, horizon)

        if isinstance(action_space, Discrete):
            self._convert_action = self._convert_action_function
        else:
            self._convert_action = self._no_convert

        if isinstance(observation_space,
                      Discrete) and len(observation_space.size) > 1:
                self._convert_state = self._convert_state_function
        else:
            self._convert_state = self._no_convert

        super(Gym, self).__init__(mdp_info)
项目:space-wrappers    作者:ngc92    | 项目源码 | 文件源码
def test_flattened_wrapper():
    expect = gym.make("ProvideTest-v0")
    md = spaces.MultiDiscrete([(0, 1), (0, 1)])
    expect.observation_space = md
    expect.provide_observation  = (1, 1)
    wrapper = FlattenedObservationWrapper(expect)
    o, r, d, i = wrapper.step(3)
    assert wrapper.observation_space.contains(o)
    assert o == 3
项目:super_mario    作者:tsunaki00    | 项目源码 | 文件源码
def __init__(self):
        utils.EzPickle.__init__(self)
        self.rom_path = ''
        self.screen_height = 224
        self.screen_width = 256
        self.action_space = spaces.MultiDiscrete([[0, 1]] * NUM_ACTIONS)
        self.observation_space = spaces.Box(low=0, high=255, shape=(self.screen_height, self.screen_width, 3))
        self.launch_vars = {}
        self.cmd_args = ['--xscale 2', '--yscale 2', '-f 0']
        self.lua_path = []
        self.subprocess = None
        self.no_render = True
        self.viewer = None

        # Pipes
        self.pipe_name = ''
        self.path_pipe_prefix = os.path.join(tempfile.gettempdir(), 'smb-fifo')
        self.path_pipe_in = ''      # Input pipe (maps to fceux out-pipe and to 'in' file)
        self.path_pipe_out = ''     # Output pipe (maps to fceux in-pipe and to 'out' file)
        self.pipe_out = None
        self.lock_out = Lock()
        self.disable_in_pipe = False
        self.disable_out_pipe = False
        self.launch_vars['pipe_name'] = ''
        self.launch_vars['pipe_prefix'] = self.path_pipe_prefix

        # Other vars
        self.is_initialized = 0     # Used to indicate fceux has been launched and is running
        self.is_exiting = 0         # Used to stop the listening thread
        self.last_frame = 0         # Last processed frame
        self.reward = 0             # Reward for last action
        self.episode_reward = 0     # Total rewards for episode
        self.is_finished = False
        self.screen = np.zeros(shape=(self.screen_height, self.screen_width, 3), dtype=np.uint8)
        self.info = {}
        self.level = 0
        self._reset_info_vars()
        self.first_step = False
        self.lock = (NesLock()).get_lock()

        # Seeding
        self.curr_seed = 0
        self._seed()
项目:noreward-rl    作者:pathak22    | 项目源码 | 文件源码
def __init__(self, level):
        self.previous_level = -1
        self.level = level
        self.game = DoomGame()
        self.loader = Loader()
        self.doom_dir = os.path.dirname(os.path.abspath(__file__))
        self._mode = 'algo'                         # 'algo' or 'human'
        self.no_render = False                      # To disable double rendering in human mode
        self.viewer = None
        self.is_initialized = False                 # Indicates that reset() has been called
        self.curr_seed = 0
        self.lock = (DoomLock()).get_lock()
        self.action_space = spaces.MultiDiscrete([[0, 1]] * 38 + [[-10, 10]] * 2 + [[-100, 100]] * 3)
        self.allowed_actions = list(range(NUM_ACTIONS))
        self.screen_height = 480
        self.screen_width = 640
        self.screen_resolution = ScreenResolution.RES_640X480
        self.observation_space = spaces.Box(low=0, high=255, shape=(self.screen_height, self.screen_width, 3))
        self._seed()
        self._configure()
项目:baselines    作者:openai    | 项目源码 | 文件源码
def make_pdtype(ac_space):
    from gym import spaces
    if isinstance(ac_space, spaces.Box):
        assert len(ac_space.shape) == 1
        return DiagGaussianPdType(ac_space.shape[0])
    elif isinstance(ac_space, spaces.Discrete):
        return CategoricalPdType(ac_space.n)
    elif isinstance(ac_space, spaces.MultiDiscrete):
        return MultiCategoricalPdType(ac_space.low, ac_space.high)
    elif isinstance(ac_space, spaces.MultiBinary):
        return BernoulliPdType(ac_space.n)
    else:
        raise NotImplementedError
项目:gym-doom    作者:ppaquette    | 项目源码 | 文件源码
def __init__(self, level):
        self.previous_level = -1
        self.level = level
        self.game = DoomGame()
        self.loader = Loader()
        self.doom_dir = os.path.dirname(os.path.abspath(__file__))
        self._mode = 'algo'                         # 'algo' or 'human'
        self.no_render = False                      # To disable double rendering in human mode
        self.viewer = None
        self.is_initialized = False                 # Indicates that reset() has been called
        self.curr_seed = 0
        self.lock = (DoomLock()).get_lock()
        self.action_space = spaces.MultiDiscrete([[0, 1]] * 38 + [[-10, 10]] * 2 + [[-100, 100]] * 3)
        self.allowed_actions = list(range(NUM_ACTIONS))
        self.screen_height = 480
        self.screen_width = 640
        self.screen_resolution = ScreenResolution.RES_640X480
        self.observation_space = spaces.Box(low=0, high=255, shape=(self.screen_height, self.screen_width, 3))
        self._seed()
        self._configure()
项目:distributional_perspective_on_RL    作者:Kiwoo    | 项目源码 | 文件源码
def make_pdtype(ac_space):
    from gym import spaces
    if isinstance(ac_space, spaces.Box):
        assert len(ac_space.shape) == 1
        return DiagGaussianPdType(ac_space.shape[0])
    elif isinstance(ac_space, spaces.Discrete):
        return CategoricalPdType(ac_space.n)
    elif isinstance(ac_space, spaces.MultiDiscrete):
        return MultiCategoricalPdType(ac_space.low, ac_space.high)
    elif isinstance(ac_space, spaces.MultiBinary):
        return BernoulliPdType(ac_space.n)
    else:
        raise NotImplementedError
项目:squadgym    作者:aleSuglia    | 项目源码 | 文件源码
def __init__(self, env_data_filename, mode="single", max_utterance_len=20, max_game_turns=20):
        with open(env_data_filename, mode="rb") as in_file:
            self._env_data = pickle.load(in_file)
        self._mode = mode
        self._max_game_turns = max_game_turns
        self._num_tokens = len(self._env_data["id2token"])
        self._num_entities = len(self._env_data["env_data"])
        self._entities = list(self._env_data["env_data"].keys())
        self.action_space = spaces.Discrete(self._num_tokens)
        self.observation_space = spaces.MultiDiscrete([[0, self._num_tokens] * max_utterance_len])
        self._last_entity = 0
        self._last_question = 0
        self._last_sequence = []
        self._game_turns = None
        self._game_score = 0
项目:reinforceflow    作者:dbobrenko    | 项目源码 | 文件源码
def _to_rf_space(space):
    """Converts Gym space instance into ReinforceFlow."""
    if isinstance(space, spaces.Discrete):
        return DiscreteOneHot(space.n)

    if isinstance(space, spaces.MultiDiscrete):
        # space.low > 0 will lead to unused first n actions.
        # return Tuple([DiscreteOneHot(n) for n in space.high])
        raise ValueError("MultiDiscrete spaces aren't supported yet.")

    if isinstance(space, spaces.MultiBinary):
        return Tuple([DiscreteOneHot(2) for _ in space.n])

    if isinstance(space, spaces.Box):
        return Continious(space.low, space.high)

    if isinstance(space, spaces.Tuple):
        converted_spaces = []
        for sub_space in space.spaces:
            converted_spaces.append(_to_rf_space(sub_space))
        return Tuple(*converted_spaces)
    raise ValueError("Unsupported space %s." % space)
项目:reinforceflow    作者:dbobrenko    | 项目源码 | 文件源码
def _make_gym2rf_converter(space):
    """Makes converter function that maps space samples Gym -> ReinforceFlow."""
    # TODO: add spaces.MultiDiscrete support.
    if isinstance(space, spaces.Discrete):
        def converter(sample):
            return one_hot(space.n, sample)
        return converter

    if isinstance(space, spaces.MultiBinary):
        def converter(sample):
            return tuple([one_hot(2, s) for s in sample])
        return converter

    if isinstance(space, spaces.Box):
        return lambda sample: sample

    if isinstance(space, spaces.Tuple):
        sub_converters = []
        for sub_space in space.spaces:
            sub_converters.append(_make_gym2rf_converter(sub_space))

        def converter(sample):
            converted_tuple = []
            for sub_sample, sub_converter in zip(sample, sub_converters):
                converted_tuple.append(sub_converter(sub_sample))
            return tuple(converted_tuple)
        return converter
    raise ValueError("Unsupported space %s." % space)
项目:space-wrappers    作者:ngc92    | 项目源码 | 文件源码
def test_flattened_wrapper():
    expect = gym.make("ExpectTest-v0")
    md = spaces.MultiDiscrete([(0, 1), (0, 1)])
    expect.action_space = md
    expect.expectation  = (1, 1)
    wrapper = FlattenedActionWrapper(expect)
    wrapper.step(3)
项目:space-wrappers    作者:ngc92    | 项目源码 | 文件源码
def is_discrete(space):
    """ Checks if a space is discrete. A space is considered to
        be discrete if it is derived from Discrete, MultiDiscrete
        or MultiBinary.
        A Tuple space is discrete if it contains only discrete 
        subspaces.
        :raises TypeError: If the space is no `gym.Space`.
    """
    assert_space(space)

    if isinstance(space, (spaces.Discrete, spaces.MultiDiscrete, spaces.MultiBinary)):
        return True
    elif isinstance(space, spaces.Box):
        return False
    elif isinstance(space, spaces.Tuple):
        return all(map(is_discrete, space.spaces))

    raise NotImplementedError("Unknown space {} of type {} supplied".format(space, type(space)))
项目:gym-doom    作者:ppaquette    | 项目源码 | 文件源码
def __init__(self, multi_discrete, options=None):
        assert isinstance(multi_discrete, MultiDiscrete)
        self.multi_discrete = multi_discrete
        self.num_discrete_space = self.multi_discrete.num_discrete_space

        # Config 1
        if options is None:
            self.n = self.num_discrete_space + 1                # +1 for NOOP at beginning
            self.mapping = {i: [0] * self.num_discrete_space for i in range(self.n)}
            for i in range(self.num_discrete_space):
                self.mapping[i + 1][i] = self.multi_discrete.high[i]

        # Config 2
        elif isinstance(options, list):
            assert len(options) <= self.num_discrete_space
            self.n = len(options) + 1                          # +1 for NOOP at beginning
            self.mapping = {i: [0] * self.num_discrete_space for i in range(self.n)}
            for i, disc_num in enumerate(options):
                assert disc_num < self.num_discrete_space
                self.mapping[i + 1][disc_num] = self.multi_discrete.high[disc_num]

        # Config 3
        elif isinstance(options, dict):
            self.n = len(options.keys())
            self.mapping = options
            for i, key in enumerate(options.keys()):
                if i != key:
                    raise Error('DiscreteToMultiDiscrete must contain ordered keys. ' \
                                'Item {0} should have a key of "{0}", but key "{1}" found instead.'.format(i, key))
                if not self.multi_discrete.contains(options[key]):
                    raise Error('DiscreteToMultiDiscrete mapping for key {0} is ' \
                                'not contained in the underlying MultiDiscrete action space. ' \
                                'Invalid mapping: {1}'.format(key, options[key]))
        # Unknown parameter provided
        else:
            raise Error('DiscreteToMultiDiscrete - Invalid parameter provided.')
项目:gym-doom    作者:ppaquette    | 项目源码 | 文件源码
def __init__(self, multi_discrete, options=None):
        assert isinstance(multi_discrete, MultiDiscrete)
        self.multi_discrete = multi_discrete
        self.num_discrete_space = self.multi_discrete.num_discrete_space

        if options is None:
            options = list(range(self.num_discrete_space))

        if not isinstance(options, list):
            raise Error('BoxToMultiDiscrete - Invalid parameter provided.')

        assert len(options) <= self.num_discrete_space
        self.low = np.array([self.multi_discrete.low[x] for x in options])
        self.high = np.array([self.multi_discrete.high[x] for x in options])
        self.mapping = { i: disc_num for i, disc_num in enumerate(options)}
项目:TensorArtist    作者:vacancy    | 项目源码 | 文件源码
def __init__(self, multi_discrete, options=None):
        super().__init__(0)

        assert isinstance(multi_discrete, MultiDiscrete)
        self.multi_discrete = multi_discrete
        self.num_discrete_space = self.multi_discrete.num_discrete_space

        # Config 1
        if options is None:
            self.n = self.num_discrete_space + 1                # +1 for NOOP at beginning
            self.mapping = {i: [0] * self.num_discrete_space for i in range(self.n)}
            for i in range(self.num_discrete_space):
                self.mapping[i + 1][i] = self.multi_discrete.high[i]

        # Config 2
        elif isinstance(options, list):
            assert len(options) <= self.num_discrete_space
            self.n = len(options) + 1                          # +1 for NOOP at beginning
            self.mapping = {i: [0] * self.num_discrete_space for i in range(self.n)}
            for i, disc_num in enumerate(options):
                assert disc_num < self.num_discrete_space
                self.mapping[i + 1][disc_num] = self.multi_discrete.high[disc_num]

        # Config 3
        elif isinstance(options, dict):
            self.n = len(list(options.keys()))
            self.mapping = options
            for i, key in enumerate(options.keys()):
                if i != key:
                    raise Error('DiscreteToMultiDiscrete must contain ordered keys. ' \
                                'Item {0} should have a key of "{0}", but key "{1}" found instead.'.format(i, key))
                if not self.multi_discrete.contains(options[key]):
                    raise Error('DiscreteToMultiDiscrete mapping for key {0} is ' \
                                'not contained in the underlying MultiDiscrete action space. ' \
                                'Invalid mapping: {1}'.format(key, options[key]))
        # Unknown parameter provided
        else:
            raise Error('DiscreteToMultiDiscrete - Invalid parameter provided.')
项目:rl-teacher    作者:nottombrown    | 项目源码 | 文件源码
def make_pdtype(ac_space):
    from gym import spaces
    if isinstance(ac_space, spaces.Box):
        assert len(ac_space.shape) == 1
        return DiagGaussianPdType(ac_space.shape[0])
    elif isinstance(ac_space, spaces.Discrete):
        return CategoricalPdType(ac_space.n)
    elif isinstance(ac_space, spaces.MultiDiscrete):
        return MultiCategoricalPdType(ac_space.low, ac_space.high)
    elif isinstance(ac_space, spaces.MultiBinary):
        return BernoulliPdType(ac_space.n)
    else:
        raise NotImplementedError
项目:multiagent-particle-envs    作者:openai    | 项目源码 | 文件源码
def _set_action(self, action, agent, action_space, time=None):
        agent.action.u = np.zeros(self.world.dim_p)
        agent.action.c = np.zeros(self.world.dim_c)
        # process action
        if isinstance(action_space, spaces.MultiDiscrete):
            act = []
            size = action_space.high - action_space.low + 1
            index = 0
            for s in size:
                act.append(action[index:(index+s)])
                index += s
            action = act
        else:
            action = [action]

        if agent.movable:
            # physical action
            if self.discrete_action_input:
                agent.action.u = np.zeros(self.world.dim_p)
                # process discrete action
                if action[0] == 1: agent.action.u[0] = -1.0
                if action[0] == 2: agent.action.u[0] = +1.0
                if action[0] == 3: agent.action.u[1] = -1.0
                if action[0] == 4: agent.action.u[1] = +1.0
            else:
                if self.force_discrete_action:
                    d = np.argmax(action[0])
                    action[0][:] = 0.0
                    action[0][d] = 1.0
                if self.discrete_action_space:
                    agent.action.u[0] += action[0][1] - action[0][2]
                    agent.action.u[1] += action[0][3] - action[0][4]
                else:
                    agent.action.u = action[0]
            sensitivity = 5.0
            if agent.accel is not None:
                sensitivity = agent.accel
            agent.action.u *= sensitivity
            action = action[1:]
        if not agent.silent:
            # communication action
            if self.discrete_action_input:
                agent.action.c = np.zeros(self.world.dim_c)
                agent.action.c[action[0]] = 1.0
            else:
                agent.action.c = action[0]
            action = action[1:]
        # make sure we used all elements of action
        assert len(action) == 0

    # reset rendering assets