Source code for concepts.benchmark.gridworld.crafting_world.pds_domains.crafting_world_pdsinterface

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : crafting_world_pdsinterface.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 05/05/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.

from typing import Dict, Sequence

from concepts.dm.pdsketch.domain import Domain
from concepts.dm.pdsketch.operator import OperatorApplier
from concepts.benchmark.gridworld.crafting_world.crafting_world_env import CraftingWorldSimulator
from concepts.dm.pdsketch.strips.strips_expression import SStateDict

__all__ = ['CraftingWorldPDDLExecutor', 'reset_simulator_from_state']


[docs] class CraftingWorldPDDLExecutor(object):
[docs] def __init__(self, domain: Domain, simulator: CraftingWorldSimulator): self.domain = domain self.simulator = simulator
[docs] def execute(self, plan: Sequence[OperatorApplier]): last_failed_operator = None for i, action in enumerate(plan): rv = self.step(action) if not rv: last_failed_operator = i break
[docs] def step(self, action: OperatorApplier) -> bool: action_name = action.operator.name action_args = action.arguments if action_name == "move-right": self.simulator.move_right() elif action_name == "move-left": self.simulator.move_left() elif action_name == "move-to": self.simulator.move_to(int(action_args[1][1:])) elif action_name == "pick-up": try: self.simulator.pick_up(int(_find_string_start_with(action_args, "i", first=True)[1:]), _find_string_start_with(action_args, "o", first=True)) except KeyError as e: print(f' pick-up {action_args} failed. Reason: {e}') return False elif action_name == "place-down": self.simulator.place_down(int(_find_string_start_with(action_args, "i", first=True)[1:])) elif action_name.startswith('mine'): # Trying mining. inventory_indices = [int(x[1:]) for x in _find_string_start_with(action_args, "i")] object_indices = _find_string_start_with(action_args, "o") hypothetical_object = [x for x in object_indices if x in self.simulator.hypothetical] if len(hypothetical_object) != 1: return False hypothetical_object = hypothetical_object[0] empty_inventory = [x for x in inventory_indices if self.simulator.inventory[x] is None] if len(empty_inventory) != 1: return False empty_inventory = empty_inventory[0] target_object = [ x for x in object_indices if x in self.simulator.objects and self.simulator.objects[x][1] == self.simulator.agent_pos ] if len(target_object) != 1: return False target_object = target_object[0] tool_inventory = list(set(inventory_indices) - set([empty_inventory])) rv = self.simulator.mine(target_object, empty_inventory, hypothetical_object, tool_inventory=tool_inventory[0] if len(tool_inventory) > 0 else None) if not rv: return False elif action_name.startswith('craft'): inventory_indices = [int(x[1:]) for x in _find_string_start_with(action_args, "i")] object_indices = _find_string_start_with(action_args, "o") hypothetical_object = [x for x in object_indices if x in self.simulator.hypothetical] if len(hypothetical_object) != 1: return False hypothetical_object = hypothetical_object[0] empty_inventory = [x for x in inventory_indices if self.simulator.inventory[x] is None] if len(empty_inventory) != 1: return False empty_inventory = empty_inventory[0] target_object = [ x for x in object_indices if x in self.simulator.objects and self.simulator.objects[x][1] == self.simulator.agent_pos ] if len(target_object) != 1: return False target_object = target_object[0] ingredients = list(set(inventory_indices) - {empty_inventory}) target_type = None hypothetical_object_index = action_args.index(hypothetical_object) hypothetical_object_varname = self.domain.operators[action_name].arguments[hypothetical_object_index].name for effect in self.domain.operators[action_name].effects: if ( effect.assign_expr.predicate.function.name == 'object-of-type' and effect.assign_expr.predicate.arguments[0].name == hypothetical_object_varname and effect.assign_expr.predicate.arguments[1].__class__.__name__ == 'ObjectConstantExpression' and effect.assign_expr.value.__class__.__name__ == 'ConstantExpression' and effect.assign_expr.value.constant.item() == 1 ): # print(' Found target type', effect.assign_expr) target_type = effect.assign_expr.predicate.arguments[1].name break rv = self.simulator.craft(target_object, empty_inventory, hypothetical_object, ingredients_inventory=ingredients, target_type=target_type) if not rv: return False else: return False return True
[docs] def reset_simulator_from_state(simulator: CraftingWorldSimulator, objects: Dict[str, Sequence[str]], state: SStateDict): agent_at = list(state['agent-at'])[0][0] simulator.agent_pos = int(agent_at[1:]) simulator.nr_grids = len(objects['tile']) simulator.nr_inventory = nr_inventory = len(objects['inventory']) simulator.objects = dict() simulator.inventory = {i: None for i in range(1, 1 + nr_inventory)} for obj_name, obj_loc in state.get('object-at', []): obj_type = None for obj_name2, obj_type2 in state['object-of-type']: if obj_name2 == obj_name: obj_type = obj_type2 break assert obj_type is not None simulator.objects[obj_name] = (obj_type, int(obj_loc[1:])) for inv_id, obj_name in state.get('inventory-holding', []): obj_type = None for obj_name2, obj_type2 in state['object-of-type']: if obj_name2 == obj_name: obj_type = obj_type2 assert obj_type is not None simulator.inventory[int(inv_id[1:])] = (obj_type, obj_name) for obj_name, obj_type in state['object-of-type']: if obj_type == 'Hypothetical': simulator.hypothetical.add(obj_name)
def _find_string_start_with(list_of_string, start, first=False): rv = list() for s in list_of_string: if s.startswith(start): if first: return s rv.append(s) return rv