Source code for concepts.dm.crow.action

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : regression_rule.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 03/16/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.

import warnings
from typing import Optional, Union, Iterator, Sequence, Tuple

from jacinle.utils.enum import JacEnum
from jacinle.utils.printing import indent_text
from jacinle.utils.meta import repr_from_str

from concepts.dsl.dsl_types import ObjectType, ValueType, Variable
from concepts.dsl.expression import ObjectOrValueOutputExpression, ValueOutputExpression, VariableAssignmentExpression, ListExpansionExpression
from concepts.dsl.tensor_value import TensorValue
from concepts.dm.crow.function import CrowFunctionEvaluationMode
from concepts.dm.crow.controller import CrowControllerApplicationExpression
from concepts.dm.crow.generator import CrowGeneratorApplicationExpression

__all__ = [
    'CrowPrecondition', 'CrowEffect',
    'CrowActionBodyPrimitiveBase', 'CrowAchieveExpression', 'CrowBindExpression', 'CrowRuntimeAssignmentExpression', 'CrowAssertExpression',
    'CrowActionBodySuiteBase', 'CrowActionConditionSuite', 'CrowActionWhileLoopSuite', 'CrowActionOrdering', 'CrowActionOrderingSuite',
    'CrowAction', 'CrowActionBodyItem',
    'CrowActionApplier', 'CrowActionApplicationExpression'
]


[docs]class CrowPrecondition(object): """The precondition of a rule."""
[docs] def __init__(self, bool_expr: ValueOutputExpression): self.bool_expr = bool_expr
bool_expr: ValueOutputExpression """The boolean expression of the precondition.""" def __str__(self): return f'Precondition({self.bool_expr})' def __repr__(self): return str(self)
[docs]class CrowEffect(object): """The effect of a rule."""
[docs] def __init__(self, assign_expr: VariableAssignmentExpression, simulation: bool = False, execution: bool = False): self.assign_expr = assign_expr self.evaluation_mode = CrowFunctionEvaluationMode.from_bools(simulation, execution)
assign_expr: VariableAssignmentExpression """The assignment expression of the effect.""" evaluation_mode: CrowFunctionEvaluationMode """The evaluation mode of the effect.""" def __str__(self): return f'Effect({self.assign_expr}, {self.evaluation_mode})' def __repr__(self): return str(self)
CrowActionBodyItem = Union[ 'CrowActionBodyPrimitiveBase', 'CrowActionBodySuiteBase' 'CrowControllerApplicationExpression', 'ListExpansionExpression' ]
[docs]class CrowActionBodyPrimitiveBase(object): pass
[docs]class CrowAchieveExpression(CrowActionBodyPrimitiveBase):
[docs] def __init__(self, goal: ValueOutputExpression): self.goal = goal
goal: ValueOutputExpression """The goal of the achieve expression.""" def __str__(self): return f'Achieve({self.goal})' def __repr__(self): return str(self)
[docs]class CrowBindExpression(CrowActionBodyPrimitiveBase):
[docs] def __init__(self, variables: Sequence[Variable], goal: Union[ValueOutputExpression, CrowGeneratorApplicationExpression]): self.variables = tuple(variables) self.goal = goal self.is_object_bind = self._check_object_bind()
variables: Tuple[Variable, ...] """The variables to bind.""" goal: Union[ValueOutputExpression, CrowGeneratorApplicationExpression] """The goal of the bind expression.""" def _check_object_bind(self) -> bool: """Check if all variables to be bound are object variables. Note that we currently do not support mixed bind.""" is_object_bind = [isinstance(var.dtype, ObjectType) for var in self.variables] # if any(is_object_bind) and not all(is_object_bind): # raise ValueError(f'Mixed bind is not supported: {self.variables}') return all(is_object_bind) def __str__(self): return f'Bind({", ".join(str(var) for var in self.variables)} <- {self.goal})' def __repr__(self): return str(self)
[docs]class CrowRuntimeAssignmentExpression(CrowActionBodyPrimitiveBase):
[docs] def __init__(self, variable: Variable, value: ValueOutputExpression): self.variable = variable self.value = value
variable: Variable """The variable to assign.""" value: ValueOutputExpression """The value to assign.""" def __str__(self): return f'Assign({self.variable} <- {self.value})' def __repr__(self): return str(self)
[docs]class CrowAssertExpression(CrowActionBodyPrimitiveBase):
[docs] def __init__(self, bool_expr: ValueOutputExpression): self.bool_expr = bool_expr
bool_expr: ValueOutputExpression """The boolean expression of the assert expression.""" def __str__(self): return f'Assert({self.bool_expr})' def __repr__(self): return str(self)
[docs]class CrowActionBodySuiteBase(object):
[docs] def __init__(self, statements: Sequence[CrowActionBodyItem]): self._statements = tuple(statements)
@property def statements(self) -> Tuple[CrowActionBodyItem, ...]: """The statements in a action body suite.""" return self._statements def __str__(self): return '\n'.join(map(str, self.statements)) def __repr__(self): return f'{self.__class__.__name__}{{\n{indent_text(self.__str__())}\n}}'
[docs]class CrowActionConditionSuite(CrowActionBodySuiteBase):
[docs] def __init__(self, condition: ValueOutputExpression, statements: Sequence[CrowActionBodyItem]): super().__init__(statements) self.condition = condition
condition: ValueOutputExpression """The condition of the action body suite.""" def __str__(self): return f'if {self.condition}:\n{indent_text(super().__str__())}'
[docs]class CrowActionWhileLoopSuite(CrowActionBodySuiteBase):
[docs] def __init__(self, condition: ValueOutputExpression, statements: Sequence[CrowActionBodyItem]): super().__init__(statements) self.condition = condition
condition: ValueOutputExpression """The condition of the action body suite.""" def __str__(self): return f'while {self.condition}:\n{indent_text(super().__str__())}'
[docs]class CrowActionOrdering(JacEnum): """The ordering of statements in a action body ordering suite. There are four types of ordering: - SEQUENTIAL: the statements are executed sequentially. - UNORDERED: the statements are executed in an unordered way. All achieve expressions would be achieved, but the order is not specified. - PROMOTABLE: the statements are executed in a promotable way. In particular, all achieve expressions would be achieved, but they might be promoted to the front of the action sequence. - PREAMBLE: the statements are executed in a preamble way. This must be the first statement in the action body and it specifies the preamble of the action, which will be executed at the place where the action is refined. - CRITICAL: the statements are executed in a critical way. This can only be used inside the promotable body and it specifies a critical section. """ SEQUENTIAL = 'sequential' UNORDERED = 'unordered' PROMOTABLE = 'promotable' PREAMBLE = 'preamble' CRITICAL = 'critical'
[docs]class CrowActionOrderingSuite(CrowActionBodySuiteBase):
[docs] def __init__(self, order: Union[str, CrowActionOrdering], statements: Sequence[CrowActionBodyItem], variable_scope_identifier: Optional[int] = None): super().__init__(statements) self.order = CrowActionOrdering.from_string(order) self.variable_scope_identifier = variable_scope_identifier self._simplify_statements()
ORDER = CrowActionOrdering order: CrowActionOrdering """The ordering of the action body suite.""" variable_scope_identifier: Optional[int] """The identifier of the variable scope. This is only used during search.""" def _simplify_statements(self): """Simplify the statements in the suite.""" simplified_statements = [] if len(self.statements) == 1: if isinstance(self.statements[0], CrowActionOrderingSuite): if self.order in (CrowActionOrdering.UNORDERED, CrowActionOrdering.SEQUENTIAL): self.order = self.statements[0].order self.variable_scope_identifier = self.statements[0].variable_scope_identifier self._statements = self._statements[0].statements return for stmt in self.statements: if isinstance(stmt, CrowActionOrderingSuite): if stmt.order == self.order and self.variable_scope_identifier == stmt.variable_scope_identifier: simplified_statements.extend(stmt.statements) else: simplified_statements.append(stmt) else: simplified_statements.append(stmt) self._statements = tuple(simplified_statements)
[docs] def clone(self, variable_scope_identifier: int): statements = [stmt.clone(variable_scope_identifier) if isinstance(stmt, CrowActionOrderingSuite) else stmt for stmt in self.statements] return CrowActionOrderingSuite(self.order, statements, variable_scope_identifier)
[docs] def is_single_statement(self) -> bool: if len(self.statements) != 1: return False if isinstance(self.statements[0], CrowActionOrderingSuite): return self.statements[0].is_single_statement() return True
[docs] def pop_right_statement(self) -> Iterator[Tuple[Optional['CrowActionOrderingSuite'], CrowActionBodyItem, int]]: assert len(self.statements) > 0, 'Cannot pop from an empty suite.' if self.order == CrowActionOrdering.SEQUENTIAL: if isinstance(self.statements[-1], CrowActionOrderingSuite): for rest, rstmt, scope in self.statements[-1].pop_right_statement(): if rest is not None: yield CrowActionOrderingSuite(self.order, self.statements[:-1] + (rest,), variable_scope_identifier=self.variable_scope_identifier), rstmt, scope else: if len(self.statements) == 1: yield None, rstmt, scope else: yield CrowActionOrderingSuite(self.order, self.statements[:-1], variable_scope_identifier=self.variable_scope_identifier), rstmt, scope else: if len(self.statements) == 1: yield None, self.statements[0], self.variable_scope_identifier else: yield CrowActionOrderingSuite(self.order, self.statements[:-1], variable_scope_identifier=self.variable_scope_identifier), self.statements[-1], self.variable_scope_identifier elif self.order == CrowActionOrdering.UNORDERED: for i, stmt in enumerate(self.statements): if isinstance(stmt, CrowActionOrderingSuite): for rest, rstmt, scope in stmt.pop_right_statement(): if rest is not None: yield CrowActionOrderingSuite(self.order, self.statements[:i] + (rest,) + self.statements[i + 1:], variable_scope_identifier=self.variable_scope_identifier), rstmt, scope else: if len(self.statements) == 1: yield None, rstmt, scope else: yield CrowActionOrderingSuite(self.order, self.statements[:i] + self.statements[i + 1:], variable_scope_identifier=self.variable_scope_identifier), rstmt, scope else: if len(self.statements) == 1: yield None, stmt, self.variable_scope_identifier else: yield CrowActionOrderingSuite(self.order, self.statements[:i] + self.statements[i + 1:], variable_scope_identifier=self.variable_scope_identifier), stmt, self.variable_scope_identifier elif self.order in (CrowActionOrdering.PROMOTABLE, CrowActionOrdering.CRITICAL): raise ValueError(f'Cannot pop from a {self.order} suite.')
[docs] def iter_statements(self): for stmt in self.statements: if isinstance(stmt, CrowActionOrderingSuite): yield from stmt.iter_statements() else: yield stmt
[docs] def iter_statements_with_scope(self): for stmt in self.statements: if isinstance(stmt, CrowActionOrderingSuite): yield from stmt.iter_statements_with_scope() else: yield stmt, self.variable_scope_identifier
[docs] def split_preamble_and_promotable(self) -> Tuple[Optional[Tuple[CrowActionBodyItem, ...]], Optional[Tuple[CrowActionBodyItem, ...]], Tuple[CrowActionBodyItem, ...]]: """Split the body into three parts: the preamble part, the promotable part, and the rest part.""" preamble = None promotable = None rest = self.statements if len(rest) > 0 and isinstance(rest[0], CrowActionOrderingSuite) and rest[0].order == CrowActionOrdering.PREAMBLE: preamble = rest[0].statements rest = rest[1:] if len(rest) > 0 and isinstance(rest[0], CrowActionOrderingSuite) and rest[0].order == CrowActionOrdering.PROMOTABLE: promotable = rest[0].statements rest = rest[1:] return preamble, promotable, rest
[docs] def split_promotable(self) -> Tuple[Optional[Tuple[CrowActionBodyItem, ...]], Tuple[CrowActionBodyItem, ...]]: """Split the body into two parts: the promotable part and the rest part.""" warnings.warn('This method is deprecated. Use split_preamble_and_promotable instead.', DeprecationWarning) body = self.statements if len(body) > 0 and isinstance(body[0], CrowActionOrderingSuite) and body[0].order == CrowActionOrdering.PROMOTABLE: return body[0].statements, body[1:] return None, body
[docs] def get_flatten_body(self) -> Tuple[CrowActionBodyItem, ...]: """Get the flatten body of the action. It only flattens the PROMOTABLE body.""" flatten_body = [] for item in self.statements: if isinstance(item, CrowActionOrderingSuite) and item.order == CrowActionOrdering.PREAMBLE: flatten_body.extend(item.statements) elif isinstance(item, CrowActionOrderingSuite) and item.order == CrowActionOrdering.PROMOTABLE: flatten_body.extend(item.statements) else: flatten_body.append(item) return tuple(flatten_body)
def __str__(self): return f'{self.order.value}@{self.variable_scope_identifier}{{\n{indent_text(super().__str__())}\n}}'
[docs] @classmethod def make_sequential(cls, *statements: Union[CrowActionBodyItem, Tuple[CrowActionBodyItem, ...]], variable_scope_identifier: Optional[int] = None): if len(statements) == 1 and isinstance(statements[0], (tuple, list)): statements = statements[0] return cls(CrowActionOrdering.SEQUENTIAL, statements, variable_scope_identifier)
[docs] @classmethod def make_unordered(cls, *statements: Union[CrowActionBodyItem, Tuple[CrowActionBodyItem, ...]], variable_scope_identifier: Optional[int] = None): if len(statements) == 1 and isinstance(statements[0], (tuple, list)): statements = statements[0] return cls(CrowActionOrdering.UNORDERED, statements, variable_scope_identifier)
[docs] @classmethod def make_promotable(cls, *statements: Union[CrowActionBodyItem, Tuple[CrowActionBodyItem, ...]], variable_scope_identifier: Optional[int] = None): if len(statements) == 1 and isinstance(statements[0], (tuple, list)): statements = statements[0] return cls(CrowActionOrdering.PROMOTABLE, statements, variable_scope_identifier)
[docs] def format(self, scopes: dict): from concepts.dm.crow.function_utils import flatten_expression from concepts.dsl.dsl_types import AutoType, Variable import concepts.dsl.expression as E scope = None if self.variable_scope_identifier is not None: scope = scopes[self.variable_scope_identifier] scope = {E.VariableExpression(Variable(name, AutoType)): E.ObjectConstantExpression(value) for name, value in scope.items()} fmt = f'{self.order.value}{{\n' for stmt in self.statements: if isinstance(stmt, CrowActionOrderingSuite): fmt += indent_text(f'{stmt.format(scopes)}') + '\n' else: if self.variable_scope_identifier is not None: if isinstance(stmt, CrowAchieveExpression): fmt += indent_text(f'achieve {flatten_expression(stmt.goal, scope)}') + '\n' elif isinstance(stmt, CrowBindExpression): fmt += indent_text(f'bind {", ".join(str(var) for var in stmt.variables)} <- {flatten_expression(stmt.goal, scope)}') + '\n' elif isinstance(stmt, CrowRuntimeAssignmentExpression): fmt += indent_text(f'assign {stmt.variable} <- {flatten_expression(stmt.value, scope)}') + '\n' elif isinstance(stmt, CrowAssertExpression): fmt += indent_text(f'assert {flatten_expression(stmt.bool_expr, scope)}') + '\n' elif isinstance(stmt, CrowControllerApplicationExpression): fmt += indent_text(f'apply {flatten_expression(stmt, scope)}') + '\n' else: raise ValueError(f'Invalid statement: {stmt}') else: fmt += indent_text(str(stmt) + '\n') fmt += '}' return fmt
[docs]class CrowAction(object): """An action definition in the CROW planner has the following components: - name: the name of the action. - arguments: the arguments of the action. - goal: the goal of the action. - body: the body of the action. - preconditions: the preconditions of the action. This is equivalent to having an "assert" statement at the beginning of the body. - effects: the effects of the action. This section will update the state of the world. """
[docs] def __init__( self, name: str, arguments: Sequence[Variable], goal: ValueOutputExpression, body: CrowActionOrderingSuite, preconditions: Sequence[CrowPrecondition], effects: Sequence[CrowEffect], always: bool = True ): """Initialize a new action. Args: name: the name of the action. arguments: the arguments of the action. goal: the goal of the action. body: the body of the action. preconditions: the preconditions of the action. effects: the effects of the action. """ self.name = name self.arguments = tuple(arguments) self.goal = goal self.body = body self.preconditions = tuple(preconditions) self.effects = tuple(effects) self.always = always self._check_body()
name: str """The name of the action.""" arguments: Tuple[Variable, ...] """The arguments of the action.""" goal: ValueOutputExpression """The goal of the action.""" body: CrowActionOrderingSuite """The body of the action.""" preconditions: Tuple[CrowPrecondition, ...] """The preconditions of the action.""" effects: Tuple[CrowEffect, ...] """The effects of the action.""" always: bool """Whether the action is always feasible.""" @property def argument_names(self) -> Tuple[str, ...]: """The names of the arguments.""" return tuple(arg.name for arg in self.arguments) @property def argument_types(self) -> Tuple[Union[ObjectType, ValueType], ...]: """The types of the arguments.""" return tuple(arg.dtype for arg in self.arguments) def _check_body(self): if not isinstance(self.body, CrowActionOrderingSuite): raise ValueError(f'Invalid body type: {type(self.body)}') if self.body.order != CrowActionOrdering.SEQUENTIAL: raise ValueError(f'Invalid body ordering: {self.body.order}') body = self.body.statements expected_promotable_body_position = 0 for i, item in enumerate(body): if isinstance(item, (CrowBindExpression, CrowRuntimeAssignmentExpression, CrowAssertExpression, CrowControllerApplicationExpression)): continue elif isinstance(item, CrowAchieveExpression): pass elif isinstance(item, CrowActionOrderingSuite) and item.order in (CrowActionOrdering.UNORDERED, CrowActionOrdering.SEQUENTIAL): self._check_regular_body(item.order, item.statements) elif isinstance(item, CrowActionOrderingSuite) and item.order == CrowActionOrdering.CRITICAL: raise ValueError(f'Invalid critical body item: {type(item)}. Critical body can only be used inside the promotable body.') elif isinstance(item, CrowActionOrderingSuite) and item.order == CrowActionOrdering.PREAMBLE: if i != 0: raise ValueError(f'Preamble body can only be the first statement in the main body.') expected_promotable_body_position = i + 1 self._check_preamble_body(body[0].statements) elif isinstance(item, CrowActionOrderingSuite) and item.order == CrowActionOrdering.PROMOTABLE: if i != expected_promotable_body_position: raise ValueError(f'Promotable body can only be the first statement (or the second if there is a preamble suite) in the main body.') self._check_promotable_body(body[0].statements) def _check_preamble_body(self, body): pass def _check_promotable_body(self, body): for item in body: if isinstance(item, CrowActionOrderingSuite) and item.order == CrowActionOrdering.PROMOTABLE: raise ValueError(f'Promotable body can only be the first statement in the main body.') elif isinstance(item, CrowActionOrderingSuite) and item.order in (CrowActionOrdering.UNORDERED, CrowActionOrdering.SEQUENTIAL): self._check_regular_body(item.order, item.statements) elif isinstance(item, CrowActionOrderingSuite) and item.order == CrowActionOrdering.CRITICAL: self._check_critical_body(item.statements) def _check_regular_body(self, order, body): for item in body: if isinstance(item, CrowActionOrderingSuite) and item.order == CrowActionOrdering.PROMOTABLE: raise ValueError(f'Promotable body can only be the first statement in the main body.') elif isinstance(item, CrowActionOrderingSuite) and item.order == CrowActionOrdering.CRITICAL: raise ValueError(f'Critical body can only be used inside the promotable body.') elif isinstance(item, CrowActionOrderingSuite) and item.order in (CrowActionOrdering.UNORDERED, CrowActionOrdering.SEQUENTIAL): self._check_regular_body(item.order, item.statements) def _check_critical_body(self, body): for item in body: if isinstance(item, CrowActionOrderingSuite): raise ValueError(f'Invalid critical body item: {type(item)}. Critical body can only contain primitive items.')
[docs] def is_sequential_only(self) -> bool: """Check if the action body contains only sequential statements (i.e., no preamble and promotable).""" for item in self.body.statements: if isinstance(item, CrowActionOrderingSuite) and item.order in (CrowActionOrdering.PREAMBLE, CrowActionOrdering.PROMOTABLE): return False return True
[docs] def assign_body_program_scope(self, scope_id: int) -> 'CrowActionOrderingSuite': """Assign the program scope to the body.""" return self.body.clone(scope_id)
[docs] def short_str(self): return f'{self.name}({", ".join(map(str, self.arguments))})'
[docs] def long_str(self): precondition_string = '\n'.join(map(str, self.preconditions)) effect_string = '\n'.join(map(str, self.effects)) fmt = f'action {self.name}[always={self.always}]({", ".join(map(str, self.arguments))}):\n' fmt += f' goal: {indent_text(str(self.goal)).lstrip()}\n' fmt += f' body:\n{indent_text(str(self.body), 2)}\n' fmt += f' preconditions:\n{indent_text(precondition_string, 2)}\n' fmt += f' effects:\n{indent_text(effect_string, 2)}' return fmt
def __str__(self): return self.short_str() __repr__ = repr_from_str
[docs]class CrowActionApplier(object):
[docs] def __init__(self, action: CrowAction, arguments: Sequence[Union[str, TensorValue]]): self.action = action self.arguments = tuple(arguments)
action: CrowAction """The action to be applied.""" arguments: Tuple[Union[str, TensorValue], ...] """The arguments of the controller application.""" def __str__(self): return f'{self.action.name}({", ".join(str(arg) for arg in self.arguments)})' __repr__ = repr_from_str
[docs]class CrowActionApplicationExpression(CrowActionBodyPrimitiveBase):
[docs] def __init__(self, action: CrowAction, arguments: Sequence[ObjectOrValueOutputExpression]): self.action = action self.arguments = tuple(arguments)
action: CrowAction """The action to be applied.""" arguments: Tuple[ObjectOrValueOutputExpression, ...] """The arguments of the controller application.""" def __str__(self): return f'{self.action.name}({", ".join(str(arg) for arg in self.arguments)})' __repr__ = repr_from_str