import os.path as osp
import time
from typing import Optional, Tuple
from dataclasses import dataclass

import numpy as np
import gym.spaces as spaces
import torch
import jactorch
# import hacl.pdsketch as pds
# import hacl.pdsketch.rl as pdsrl
# import hacl.envs.gridworld.minigrid.gym_minigrid as minigrid
# from .gym_minigrid.path_finding import find_path_to_obj

import concepts.benchmark.gridworld.minigrid.gym_minigrid as minigrid
from concepts.benchmark.gridworld.minigrid.gym_minigrid.minigrid import MiniGridEnv
from concepts.benchmark.gridworld.minigrid.gym_minigrid.path_finding import find_path_to_obj

from import State, Domain
from import PDSketchExecutor

__all__ = [
    'MiniGridEnvV20220407', 'make_minigrid_env', 'get_minigrid_domain_filename',
    'visualize_minigrid_planner', 'visualize_minigrid_plan'

def _map_int(x):
    if isinstance(x, tuple):
        return map(int, x)
    if isinstance(x, np.ndarray):
        return map(int, x)
    if isinstance(x, torch.Tensor):
        return map(int, jactorch.as_numpy(x))

[docs] @dataclass class MiniGridEnvAction(object): name: str arguments: Tuple[int, ...] = tuple()
[docs] class MiniGridEnvV20220407(MiniGridEnv): SUPPORTED_TASKS = ['gotosingle', 'goto', 'goto2', 'pickup', 'open', 'generalization'] SUPPORTED_ENCODING = ['full']
[docs] def __init__(self, task='pickup', encoding: str = 'full'): assert task in type(self).SUPPORTED_TASKS, f'Unknown task: {task}.' assert encoding in type(self).SUPPORTED_ENCODING, f'Unknown encoding: {encoding}.' self.task = task self.encoding = encoding self.encoding_executor = None self.options = dict() self.goal_obj: Optional[minigrid.WorldObj] = None self.goal_pose: Optional[Tuple[int, int]] = None self.mission: str = '' super().__init__(grid_size=7, max_steps=64, seed=1337, require_obs=False)
action_space: spaces.Discrete observation_space: spaces.Box task: str """A short string describing the task.""" encoding: str """A short string describing the encoding method.""" encoding_executor: Optional[PDSketchExecutor] """The :class:`` used for encoding the states.""" goal_obj: Optional[minigrid.WorldObj] """The goal object.""" goal_pose: Optional[Tuple[int, int]] """The goal pose.""" mission: str """The mission string."""
[docs] def set_options(self, **kwargs): self.options.update(kwargs)
[docs] def get_option(self, name, default=None): return self.options.get(name, default)
[docs] def set_encoding_executor(self, executor: PDSketchExecutor): self.encoding_executor = executor
@property def encoding_domain(self) -> Domain: return self.encoding_executor.domain def _gen_grid(self, width, height): if self.task == 'gotosingle': _gen_grid_goto_single(self, width, height) elif self.task in ('goto', 'goto2'): _gen_grid_goto(self, width, height) elif self.task == 'pickup': _gen_grid_pickup(self, width, height) elif self.task == 'open': _gen_grid_open(self, width, height) elif self.task == 'generalization': _gen_grid_generalization(self, width, height) else: raise ValueError(f'Unknown task: {self.task}.')
[docs] def reset(self): super().reset() return self.compute_obs()
[docs] def step(self, action: MiniGridEnvAction): if == 'move': self.step_move_to(action.arguments[0], action.arguments[1]) elif == 'forward': self.step_forward() elif == 'lturn': self.step_lturn() elif == 'rturn': self.step_rturn() elif == 'pickup': self.step_pickup() elif == 'place': self.step_drop() elif == 'toggle': self.step_inner(self.Actions.toggle) else: raise ValueError(f'Unknown action: {action}.') obs = self.compute_obs() done = self.compute_done() return obs, -1, done, {}
[docs] def compute_obs(self): state = self.get_pds_state() return {'state': state, 'mission': self.mission}
[docs] def compute_done(self): if self.task in ('goto', 'goto2', 'gotosingle'): fwd_pos = self.front_pos fwd_cell = self.grid.get(*fwd_pos) if fwd_cell is not None and fwd_cell.type == self.goal_obj.type and fwd_cell.color == self.goal_obj.color: return True elif self.task == 'pickup': if self.carrying is not None and self.carrying.color == self.goal_obj.color and self.carrying.type == self.goal_obj.type: return True elif self.task == 'open': for _, _, obj in self.iter_objects(): if obj.color == self.goal_obj.color and obj.type == self.goal_obj.type and obj.is_open: return True elif self.task == 'generalization': if self.carrying is not None and self.carrying.color == self.goal_obj[0].color and self.carrying.type == self.goal_obj[0].type: fwd_pos = self.front_pos fwd_cell = self.grid.get(*fwd_pos) if fwd_cell is not None and fwd_cell.type == self.goal_obj[1].type and fwd_cell.color == self.goal_obj[1].color: return True else: raise ValueError(f'Unknown task: {self.task}.') return False
[docs] def step_inner(self, action): super().step(action)
[docs] def step_move_to(self, pose, dir, traj=None): x, y = _map_int(pose) dir, = _map_int(dir) if self.grid.get(x, y) is None or self.grid.get(x, y).can_overlap(): self.agent_pos = (x, y) self.agent_dir = dir
[docs] def step_pickup(self): fwd_pos = self.front_pos fwd_cell = self.grid.get(*fwd_pos) if fwd_cell and fwd_cell.can_pickup(): if self.carrying is None: self.carrying = fwd_cell self.carrying.cur_pos = (-1, -1) self.grid.set(*fwd_pos, None)
[docs] def step_forward(self): fwd_pos = self.front_pos fwd_cell = self.grid.get(*fwd_pos) if fwd_cell is None or fwd_cell.can_overlap(): self.agent_pos = tuple(fwd_pos)
[docs] def step_lturn(self): self.agent_dir = (self.agent_dir - 1 + 4) % 4
[docs] def step_rturn(self): self.agent_dir = (self.agent_dir + 1) % 4
[docs] def step_drop(self): fwd_pos = self.front_pos fwd_cell = self.grid.get(*fwd_pos) if fwd_cell is None and self.carrying: self.grid.set(fwd_pos[0], fwd_pos[1], self.carrying) self.carrying.cur_pos = tuple(fwd_pos) self.carrying = None
[docs] def step_toggle(self): fwd_pos = self.front_pos fwd_cell = self.grid.get(*fwd_pos) if fwd_cell: fwd_cell.toggle(self, fwd_pos)
[docs] def get_pds_state(self) -> State: if self.encoding_executor is None: raise RuntimeError(f'Encoding executor is not set. Use set_encoding_executor() to set it.') if self.encoding == 'full': return _get_pds_state_full(self, self.encoding_executor, ignore_walls=False) elif self.encoding == 'basic': return _get_pds_state_basic(self, self.encoding_executor, ignore_walls=False) else: raise ValueError(f'Unknown encoding: {self.encoding}.')
[docs] def debug_print(self): print(self)
def _get_pds_state_full(env: MiniGridEnvV20220407, executor: PDSketchExecutor, ignore_walls: bool = False, include_extra_predicates: bool = True): """Encode the environment state into a :class:`` object. Args: env: the environment. executor: the executor. ignore_walls: whether to ignore walls. include_extra_predicates: whether to include extra predicates, including pickable, toggleable, robot-holding. """ domain = executor.domain object_names = {'r': domain.types['robot']} object_type2id = dict() for k in minigrid.OBJECT_TO_IDX: object_type2id[k] = 0 object_images = list() object_poses = list() objects = list() for x, y, obj in env.iter_objects(): if ignore_walls and obj.type == 'wall': continue if not hasattr(obj, 'name'): = f'{obj.type}:{object_type2id[obj.type]}' object_names[] = domain.types['item'] object_images.append(obj.encode()) object_poses.append((x, y)) object_type2id[obj.type] += 1 objects.append(obj) state, ctx = executor.new_state(object_names, create_context=True) if include_extra_predicates: predicates = list() for obj in objects: if obj.type == 'wall': pass else: predicates.append(ctx.pickable( if obj.type == 'door': predicates.append(ctx.toggleable( if env.carrying is not None: predicates.append(ctx.robot_holding('r', ctx.define_predicates(predicates) ctx.define_feature('robot-pose', torch.tensor([env.agent_pos], dtype=torch.float32)) ctx.define_feature('robot-direction', torch.tensor([[env.agent_dir]], dtype=torch.int64)) ctx.define_feature('item-pose', torch.tensor(object_poses, dtype=torch.float32)) ctx.define_feature('item-image', torch.tensor(object_images, dtype=torch.float32)) return state def _get_pds_state_basic(env: MiniGridEnvV20220407, executor: PDSketchExecutor, ignore_walls: bool = False): """Encode the environment state into a :class:`` object. Args: env: the environment. executor: the executor. ignore_walls: whether to ignore walls. """ domain = executor.domain object_names = {'r': domain.types['robot']} object_type2id = dict() for k in minigrid.OBJECT_TO_IDX: object_type2id[k] = 0 robot_images = list() robot_images.append(env.agent_pos + (env.agent_dir, )) object_images = list() for x, y, obj in env.iter_objects(): if ignore_walls and obj.type == 'wall': continue if not hasattr(obj, 'name'): = f'{obj.type}:{object_type2id[obj.type]}' object_names[] = domain.types['item'] object_images.append(obj.encode() + (x, y)) object_type2id[obj.type] += 1 state, ctx = executor.new_state(object_names, create_context=True) ctx.define_feature('robot-image', torch.tensor(robot_images, dtype=torch.float32)) ctx.define_feature('item-image', torch.tensor(object_images, dtype=torch.float32)) return state
[docs] def make_minigrid_env(*args, **kwargs): return MiniGridEnvV20220407(*args, **kwargs)
[docs] def get_minigrid_domain_filename(encoding: str = 'full') -> str: """Get the domain filename of the crafting world.""" return osp.join(osp.dirname(__file__), 'pds_domains', f'minigrid-domain-v20220407-{encoding}.pdsketch')
[docs] def visualize_minigrid_planner(env: MiniGridEnvV20220407, planner): torch.set_grad_enabled(False) while True: init_obs = env.reset() state, mission = init_obs['state'], init_obs['mission'] assert planner is not None plan = planner(state, mission) cmd = visualize_minigrid_plan(env, plan) if cmd == 'q': break
[docs] def visualize_minigrid_plan(env: MiniGridEnvV20220407, plan): env.render() print('Plan: ' + ', '.join([str(x) for x in plan])) print('Press <Enter> to visualize.') _ = input('> ').strip() for action in plan: print('Executing action: ' + str(action)) if == 'move': pose = action.arguments[1].tensor.tolist() dir = action.arguments[2].tensor.item() for action in minigrid.find_path(env, pose, dir): env.step_inner(action) env.render() time.sleep(0.5) elif == 'forward': env.step_inner(MiniGridEnvV20220407.Actions.forward) elif == 'lturn': env.step_inner(MiniGridEnvV20220407.Actions.left) elif == 'rturn': env.step_inner(MiniGridEnvV20220407.Actions.right) elif == 'pickup': env.step_inner(MiniGridEnvV20220407.Actions.pickup) elif == 'toggle': env.step_inner(MiniGridEnvV20220407.Actions.toggle) else: raise NotImplementedError(action) env.render() time.sleep(0.5) print('Visualization finished.') print('Press <Enter> to continue. Type q to quit.') cmd = input('> ').strip() return cmd
def _gen_basic_room(env, width, height): env.grid = minigrid.Grid(width, height) env.agent_pos = (3, 3) env.agent_dir = 0 env.grid.horz_wall(0, 0, 7) env.grid.horz_wall(0, 6, 7) env.grid.vert_wall(0, 0, 7) env.grid.vert_wall(6, 0, 7) def _gen_grid_goto_single(env, width, height): _gen_basic_room(env, width, height) objects = list() object_poses = list() for i in range(1): shape = env.np_random.choice([minigrid.Key, minigrid.Box, minigrid.Ball]) color = env.np_random.choice(minigrid.COLOR_NAMES) while True: pose = env.np_random.integers(1, 6, size=2) if env.grid.get(*pose) is None and not np.all(pose == 3) and not np.all(pose == (4, 3)): # not initially facing. break this_object = shape(color) objects.append(this_object) object_poses.append(pose) env.grid.set(*pose, this_object) env.goal_obj = goal = env.np_random.choice(objects) env.goal_pose = object_poses[objects.index(goal)] env.mission = f'(exists (?o - item) (and (robot-is-facing r ?o) (is-{goal.type} ?o)))' def _gen_grid_goto(env, width, height): for _ in range(env.get_option('max_trials', 100)): _gen_basic_room(env, width, height) objects = list() object_poses = list() for i in range(env.get_option('nr_objects', 4)): shape = env.np_random.choice([minigrid.Key, minigrid.Box, minigrid.Ball]) color = env.np_random.choice(minigrid.COLOR_NAMES) while True: pose = env.np_random.integers(1, 6, size=2) if env.grid.get(*pose) is None and not np.all(pose == 3) and not np.all(pose == (4, 3)): # not initially facing. break this_object = shape(color) objects.append(this_object) object_poses.append(pose) env.grid.set(*pose, this_object) env.goal_obj = goal = env.np_random.choice(objects) env.goal_pose = object_poses[objects.index(goal)] env.mission = f'(exists (?o - item) (and (robot-is-facing r ?o) (is-{goal.type} ?o) (is-{goal.color} ?o)))' path = find_path_to_obj(env, tuple(env.goal_pose)) if path is not None: break def _gen_grid_pickup(env, width, height): for _ in range(env.get_option('max_trials', 100)): _gen_basic_room(env, width, height) objects = list() object_poses = list() for i in range(env.get_option('nr_objects', 4)): shape = env.np_random.choice([minigrid.Key, minigrid.Box, minigrid.Ball]) color = env.np_random.choice(minigrid.COLOR_NAMES) while True: pose = env.np_random.integers(1, 6, size=2) if env.grid.get(*pose) is None and not np.all(pose == 3): break this_object = shape(color) objects.append(this_object) object_poses.append(pose) env.grid.set(*pose, this_object) env.goal_obj = goal = env.np_random.choice(objects) env.goal_pose = object_poses[objects.index(goal)] env.mission = f'(exists (?o - item) (and (robot-holding r ?o) (is-{goal.type} ?o) (is-{goal.color} ?o)))' path = find_path_to_obj(env, tuple(env.goal_pose)) if path is not None: break def _gen_grid_open(env, width, height): _gen_basic_room(env, width, height) objects = list() for i in range(4): color = env.np_random.choice(minigrid.COLOR_NAMES) while True: pose = env.np_random.integers(1, 6) dir = i # dir = env.np_random.integers(0, 4) if dir == 0: pose = (pose, 0) elif dir == 1: pose = (pose, 6) elif dir == 2: pose = (0, pose) elif dir == 3: pose = (6, pose) if env.grid.get(*pose).type != 'door': break this_object = minigrid.Door(color) objects.append(this_object) env.grid.set(*pose, this_object) env.goal_obj = goal = env.np_random.choice(objects) env.mission = f'(exists (?o - item) (and (is-open ?o) (is-{goal.color} ?o)))' def _gen_grid_generalization(env, width, height): for _ in range(env.get_option('max_trials', 100)): _gen_basic_room(env, width, height) objects = list() object_poses = list() for i in range(env.get_option('nr_objects', 4)): shape = env.np_random.choice([minigrid.Key, minigrid.Box, minigrid.Ball]) color = env.np_random.choice(minigrid.COLOR_NAMES) while True: pose = env.np_random.integers(1, 6, size=2) if env.grid.get(*pose) is None and not np.all(pose == 3): break this_object = shape(color) objects.append(this_object) object_poses.append(pose) env.grid.set(*pose, this_object) env.goal_obj = goal = env.np_random.choice(objects, size=2, replace=False) env.goal_pose = object_poses[objects.index(goal[0])], object_poses[objects.index(goal[1])] env.mission = f"""(and (exists (?o - item) (and (robot-holding r ?o) (is-{goal[0].type} ?o) (is-{goal[0].color} ?o))) (exists (?o - item) (and (robot-is-facing r ?o) (is-{goal[1].type} ?o) (is-{goal[1].color} ?o))) )""" path = find_path_to_obj(env, tuple(env.goal_pose[0])) if path is not None: break path = find_path_to_obj(env, tuple(env.goal_pose[1])) if path is not None: break