Source code for concepts.dm.crow.domain

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : domain3.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 03/15/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.

from typing import Any, Optional, Union, Sequence, List, Dict

from jacinle.logging import get_logger

from concepts.dsl.dsl_types import ObjectType, PyObjValueType, TensorValueTypeBase, ListType, Variable, ObjectConstant
from concepts.dsl.dsl_types import VectorValueType, ScalarValueType, NamedTensorValueType
from concepts.dsl.dsl_types import BOOL, INT64, FLOAT32
from concepts.dsl.dsl_functions import Function, FunctionType, FunctionArgumentListType, FunctionReturnType
from concepts.dsl.dsl_domain import DSLDomainBase
from concepts.dsl.expression import Expression, ValueOutputExpression, VariableOrValueOutputExpression
from concepts.dsl.tensor_value import TensorValue
from concepts.dsl.tensor_state import NamedObjectTensorState, ObjectNameArgument

from concepts.dm.crow.controller import CrowController
from concepts.dm.crow.function import CrowFeature, CrowFunction
from concepts.dm.crow.generator import CrowGeneratorBase, CrowDirectedGenerator, CrowUndirectedGenerator
from concepts.dm.crow.action import CrowPrecondition, CrowEffect, CrowActionOrderingSuite, CrowAction

logger = get_logger(__file__)

__all__ = ['CrowDomain', 'CrowProblem', 'CrowState']


[docs]class CrowDomain(DSLDomainBase): """The planning domain definition."""
[docs] def __init__(self, name: Optional[str] = None): """Initialize a planning domain. Args: name: The name of the domain. """ super().__init__(name) self.features = dict() self.functions = dict() self.controllers = dict() self.actions = dict() self.generators = dict() self.external_functions = dict() self.external_function_crossrefs = dict()
name: str """The name of the domain.""" types: Dict[str, Union[ObjectType, PyObjValueType, TensorValueTypeBase]] """The types defined in the domain, as a dictionary from type names to types.""" functions: Dict[str, CrowFunction] """A mapping of functions: from function name to the corresponding :class:`~concepts.dm.crow.function.CrowFunction` class.""" features: Dict[str, CrowFeature] """A mapping of features: from feature name to the corresponding :class:`~concepts.dm.crow.feature.CrowFeature` class.""" controllers: Dict[str, CrowController] """A mapping of controllers: from controller name to the corresponding :class:`~concepts.dm.crow.controller.Controller` class.""" constants: Dict[str, ObjectConstant] """The constants defined in the domain, as a dictionary from constant names to values.""" actions: Dict[str, CrowAction] """A mapping of actions: from action name to the corresponding :class:`~concepts.dm.crow.action.CrowAction` class.""" generators: Dict[str, CrowGeneratorBase] """A mapping of generators: from generator name to the corresponding :class:`~concepts.dm.crow.generator.CrowGeneratorBase` class.""" external_functions: Dict[str, Function] """A mapping of external functions: from function name to the corresponding :class:`~concepts.dsl.dsl_functions.Function` class.""" external_function_crossrefs: Dict[str, str] """A mapping from function name to another function name. This is useful when defining one function as an derived function of another function."""
[docs] def set_name(self, name: str): """Set the name of the domain. Args: name: the new name of the domain. """ self.name = name
BUILTIN_TYPES = ['object', 'pyobject', 'bool', 'int64', 'float32', '__control__', '__totally_ordered_plan__', '__partially_ordered_plan__'] BUILTIN_NUMERIC_TYPES = { 'bool': BOOL, 'int64': INT64, 'float32': FLOAT32 } BUILTIN_PYOBJ_TYPES = { '__control__': PyObjValueType('__control__', alias='__control__'), '__regression_body_item__': PyObjValueType('__regression_body_item__', alias='__regression_body_item__'), '__totally_ordered_plan__': ListType(PyObjValueType('__regression_body_item__'), alias='__totally_ordered_plan__'), }
[docs] def define_type(self, typename, parent_name: Optional[Union[VectorValueType, ScalarValueType, str]] = 'object') -> Union[ObjectType, PyObjValueType, VectorValueType, ScalarValueType]: """Define a new type. Args: typename: the name of the new type. parent_name: the parent type of the new type, default to 'object'. Returns: the newly defined type. """ if typename == 'object': raise ValueError('Typename "object" is reserved.') elif typename in type(self).BUILTIN_TYPES: raise ValueError('Typename {} is a built-in type.'.format(typename)) assert isinstance(parent_name, (str, VectorValueType)), f'Currently only support inheritance from builtin types: {type(self).BUILTIN_TYPES}.' if isinstance(parent_name, str): if parent_name == 'object': self.types[typename] = ObjectType(typename) elif parent_name == 'pyobject': dtype = PyObjValueType(typename) self.types[typename] = dtype self.declare_external_function(f'type::{typename}::equal', [dtype, dtype], BOOL) elif parent_name == 'int64': dtype = NamedTensorValueType(typename, INT64) self.types[typename] = dtype self.declare_external_function(f'type::{typename}::equal', [dtype, dtype], BOOL) elif parent_name == 'float32': dtype = NamedTensorValueType(typename, FLOAT32) self.types[typename] = dtype self.declare_external_function(f'type::{typename}::equal', [dtype, dtype], BOOL) else: raise ValueError(f'Unknown parent type: {parent_name}.') elif isinstance(parent_name, VectorValueType): dtype = NamedTensorValueType(typename, parent_name) self.types[typename] = dtype self.declare_external_function(f'type::{typename}::equal', [dtype, dtype], BOOL) else: raise ValueError(f'Unknown parent type: {parent_name}.') return self.types[typename]
[docs] def has_type(self, typename: str): """Check whether a type exists. Args: typename: the name of the type. Returns: whether the type exists. """ if typename in type(self).BUILTIN_TYPES: return True if typename in type(self).BUILTIN_PYOBJ_TYPES: return True return typename in self.types
[docs] def get_type(self, typename: str) -> Union[ObjectType, PyObjValueType, VectorValueType, ScalarValueType, NamedTensorValueType]: """Get a type by name. Args: typename: the name of the type. Returns: the type with the given name. """ if typename == 'object': return ObjectType('object') elif typename == 'pyobject': return PyObjValueType('pyobject') if typename in type(self).BUILTIN_NUMERIC_TYPES: return type(self).BUILTIN_NUMERIC_TYPES[typename] elif typename in type(self).BUILTIN_PYOBJ_TYPES: return type(self).BUILTIN_PYOBJ_TYPES[typename] if typename not in self.types: raise ValueError(f'Unknown type: {typename}, known types are: {list(self.types.keys())}.') return self.types[typename]
[docs] def define_feature( self, name: str, arguments: FunctionArgumentListType, return_type: FunctionReturnType = BOOL, *, derived_expression: Optional[ValueOutputExpression] = None, observation: Optional[bool] = None, state: Optional[bool] = None, ) -> CrowFeature: """Define a new feature. Args: name: the name of the new feature. arguments: the arguments of the new feature. return_type: the return type of the new feature. derived_expression: the derived expression of the new feature. observation: whether the new feature is an observation variable. state: whether the new feature is a state variable. Returns: the newly defined feature. """ if name in self.features: raise ValueError(f'Feature {name} already exists.') feature = CrowFeature( name, FunctionType(arguments, return_type), derived_expression=derived_expression, observation=observation, state=state ) self.features[name] = feature return feature
[docs] def has_feature(self, name: str) -> bool: """Check whether a feature exists. Args: name: the name of the feature. Returns: whether the feature exists. """ return name in self.features
[docs] def get_feature(self, name: str) -> CrowFeature: """Get a feature by name. Args: name: the name of the feature. Returns: the feature with the given name. """ if name not in self.features: raise ValueError(f'Unknown feature: {name}.') return self.features[name]
[docs] def define_crow_function( self, name: str, arguments: FunctionArgumentListType, return_type: FunctionReturnType = BOOL, *, derived_expression: Optional[ValueOutputExpression] = None, generator_placeholder: bool = False, inplace_generators: Optional[Sequence[str]] = None, simulation: bool = False, execution: bool = False, is_generator_function: bool = False, ): """Define a new function. Args: name: the name of the new function. arguments: the arguments of the new function. return_type: the return type of the new function. generator_placeholder: whether the new function is a generator placeholder. inplace_generators: a list of generators that will be defined in-place for this function. simulation: whether the new function requires the up-to-date simulation state to evaluate. execution: whether the new function requires the up-to-date execution state to evaluate. is_generator_function: whether the new function is a generator function. Returns: the newly defined function. """ if name in self.functions: raise ValueError(f'Function {name} already exists.') function = CrowFunction( name, FunctionType(arguments, return_type, is_generator_function=is_generator_function), derived_expression=derived_expression, generator_placeholder=generator_placeholder, inplace_generators=inplace_generators, simulation=simulation, execution=execution ) self.functions[name] = function if function.derived_expression is not None: self.external_functions[name] = function return function
[docs] def has_function(self, name: str) -> bool: """Check whether a function exists. Args: name: the name of the function. Returns: whether the function exists. """ return name in self.functions
[docs] def get_function(self, name: str) -> CrowFunction: """Get a function by name. Args: name: the name of the function. Returns: the function with the given name. """ if name not in self.functions: raise ValueError(f'Unknown function: {name}.') return self.functions[name]
[docs] def define_controller(self, name: str, arguments: Sequence[Variable], extends: Optional[str] = None): """Define a new controller. Args: name: the name of the new controller. arguments: the arguments of the new controller. Returns: the newly defined controller. """ if name in self.controllers: raise ValueError(f'Controller {name} already exists.') controller = CrowController(name, arguments, extends=extends) self.controllers[name] = controller return controller
[docs] def has_controller(self, name: str) -> bool: """Check whether a controller exists. Args: name: the name of the controller. Returns: whether the controller exists. """ return name in self.controllers
[docs] def get_controller(self, name: str) -> CrowController: """Get a controller by name. Args: name: the name of the controller. Returns: the controller with the given name. """ if name not in self.controllers: raise ValueError(f'Unknown controller: {name}.') return self.controllers[name]
[docs] def define_action( self, name: str, arguments: Sequence[Variable], goal: ValueOutputExpression, body: CrowActionOrderingSuite, preconditions: Sequence[CrowPrecondition], effects: Sequence[CrowEffect], always: bool = False ): """Define a new action. Args: name: the name of the new action. arguments: the arguments of the new action. goal: the goal of the new action. body: the body of the new action. preconditions: the preconditions of the new action. effects: the effects of the new action. always: whether the new action is always feasible. Returns: the newly defined action. """ if name in self.actions: raise ValueError(f'Action {name} already exists.') self.actions[name] = action = CrowAction(name, arguments, goal, body, preconditions, effects, always=always) return action
[docs] def has_action(self, name: str) -> bool: return name in self.actions
[docs] def get_action(self, name: str) -> CrowAction: if name not in self.actions: raise ValueError(f'Action {name} not found.') return self.actions[name]
[docs] def define_generator( self, name: str, arguments: Sequence[Variable], certifies: Union[Sequence[ValueOutputExpression], ValueOutputExpression], inputs: Optional[Sequence[Variable]] = None, outputs: Optional[Sequence[Variable]] = None, priority: int = 0, simulation: bool = False, execution: bool = False ) -> CrowGeneratorBase: """Define a new generator. Args: name: the name of the new generator. arguments: the parameters of the new generator. certifies: the certified condition of the new generator. inputs: the input variables of the new generator. outputs: the output variables of the new generator. priority: the priority of the new generator. simulation: whether the new generator requires the up-to-date simulation state to evaluate. execution: whether the new generator requires the up-to-date execution state to evaluate. Returns: the newly defined generator. """ if name in self.generators: raise ValueError(f'Generator {name} already exists.') if not isinstance(certifies, (list, tuple)): certifies = [certifies] if inputs is None and outputs is None: generator = CrowUndirectedGenerator(name, arguments, certifies, priority=priority) else: assert inputs is not None and outputs is not None, 'Both inputs and outputs should be specified.' generator = CrowDirectedGenerator(name, arguments, certifies, inputs, outputs, priority=priority) self.generators[name] = generator return generator
[docs] def has_generator(self, name: str) -> bool: return name in self.generators
[docs] def get_generator(self, name: str) -> CrowGeneratorBase: if name in self.generators: return self.generators[name] raise ValueError(f'Generator {name} not found.')
[docs] def declare_external_function(self, function_name: str, argument_types: FunctionArgumentListType, return_type: FunctionReturnType, kwargs: Optional[Dict[str, Any]] = None) -> Function: """Declare an external function. Args: function_name: the name of the external function. argument_types: the argument types of the external function. return_type: the return type of the external function. kwargs: the keyword arguments of the external function. Supported keyword arguments are: - ``observation``: whether the external function is an observation variable. - ``state``: whether the external function is a state variable. """ if kwargs is None: kwargs = dict() self.external_functions[function_name] = CrowFunction(function_name, FunctionType(argument_types, return_type), **kwargs) return self.external_functions[function_name]
[docs] def declare_external_function_crossref(self, function_name: str, cross_ref_name: str): """Declare a cross-reference to an external function. This is useful when one function is an derived function of another function. Args: function_name: the name of the external function. cross_ref_name: the name of the cross-reference. """ self.external_function_crossrefs[function_name] = cross_ref_name
[docs] def parse(self, string: Union[str, Expression], state: Optional['State'] = None, variables: Optional[Sequence[Variable]] = None) -> Expression: """Parse a string into an expression. Args: string: the string to be parsed. variables: the variables to be used in the expression. Returns: the parsed expression. """ if isinstance(string, Expression): return string from concepts.dm.crow.parsers.crow_parser import parse_expression return parse_expression(self, string, state=state, variables=variables)
[docs] def make_executor(self) -> 'CrowExecutor': """Make an executor for this domain.""" from concepts.dm.crow.executors.crow_executor import CrowExecutor return CrowExecutor(self)
[docs] def incremental_define(self, string: str): """Incrementally define new parts of the domain. Args: string: the string to be parsed and defined. """ from concepts.dm.crow.parsers.crow_parser import load_domain_string_incremental return load_domain_string_incremental(self, string)
[docs] def print_summary(self, external_functions: bool = False, full_generators: bool = False): """Print a summary of the domain."""
# TODO(Jiayuan Mao @ 2024/03/15): implement this.
[docs] def post_init(self): """Post-initialization of the domain. This function should be called by the domain generator after all the domain definitions (predicates and operators) are done. Currently, the following post-initialization steps are performed: 1. Analyze the static predicates. """ self._analyze_static_predicates()
def _analyze_static_predicates(self): """Run static analysis on the predicates to determine which predicates are static.""" # TODO(Jiayuan Mao @ 2024/03/15): implement this. pass
[docs]class CrowProblem(object):
[docs] def __init__(self, domain: CrowDomain): self.domain = domain self.objects = dict() self.state = None self.goal = None
[docs] @classmethod def from_state_and_goal(cls, domain: CrowDomain, state: 'CrowState', goal: ValueOutputExpression): problem = cls(domain) problem.state = state problem.goal = goal return problem
[docs] def add_object(self, name: str, typename: str): self.objects[name] = typename
[docs] def init_state(self): if self.state is not None: return domain = self.domain self.state = CrowState.make_empty_state(domain, self.objects)
[docs] def set_goal(self, goal: ValueOutputExpression): self.goal = goal
[docs]class CrowState(NamedObjectTensorState):
[docs] @classmethod def make_empty_state(cls, domain: CrowDomain, objects: Dict[str, str]): object_names = list(objects.keys()) object_types = [domain.types[objects[name]] for name in object_names] state = cls(None, object_names, object_types) for feature_name, feature in domain.features.items(): if not feature.is_state_variable: continue return_type = feature.return_type if feature_name not in state.features: sizes = list() for arg_def in feature.arguments: sizes.append(len(state.object_type2name[arg_def.typename]) if arg_def.typename in state.object_type2name else 0) sizes = tuple(sizes) state.features[feature_name] = TensorValue.make_empty(return_type, [var.name for var in feature.arguments], sizes) return state
[docs] def fast_set_value(self, feature_name: str, indices: Sequence[str], value: Any): if feature_name not in self.features: raise ValueError(f'Unknown feature: {feature_name}.') indices = tuple(self.get_typed_index(arg) for arg in indices) self.features[feature_name].fast_set_index(indices, value)