#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File : behavior_utils.py
# Author : Jiayuan Mao
# Email : maojiayuan@gmail.com
# Date : 03/17/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.
from typing import Any, Optional, Union, Sequence, NamedTuple, List, Dict
from jacinle.utils.printing import indent_text
from concepts.dsl.dsl_types import AutoType, Variable, ObjectConstant
from concepts.dsl.constraint import ConstraintSatisfactionProblem, OptimisticValue
from concepts.dsl.tensor_value import TensorValue
from concepts.dsl.expression import Expression, ExpressionDefinitionContext, ValueOutputExpression, ObjectOrValueOutputExpression, FunctionApplicationExpression, VariableExpression, VariableAssignmentExpression, ObjectConstantExpression, is_null_expression
from concepts.dsl.expression import AssignExpression, BoolExpression, BoolOpType
from concepts.dsl.expression_utils import surface_fol_downcast, find_free_variables
from concepts.dm.crow.controller import CrowController, CrowControllerApplicationExpression
from concepts.dm.crow.crow_expression_utils import crow_replace_expression_variables
from concepts.dm.crow.executors.crow_executor import CrowExecutor
from concepts.dm.crow.behavior import CrowBehavior, CrowBindExpression, CrowAssertExpression, CrowRuntimeAssignmentExpression, CrowFeatureAssignmentExpression, CrowAchieveExpression, CrowUntrackExpression, CrowBehaviorApplicationExpression, CrowBehaviorEffectApplicationExpression, CrowBehaviorOrderingSuite, CrowBehaviorCommit
from concepts.dm.crow.behavior import CrowBehaviorForeachLoopSuite, CrowBehaviorWhileLoopSuite, CrowBehaviorConditionSuite, CrowEffectApplier
from concepts.dm.crow.crow_domain import CrowDomain, CrowState
__all__ = [
'ApplicableBehaviorItem', 'match_applicable_behaviors', 'match_policy_applicable_behaviors',
'crow_replace_expression_variables_ext', 'format_behavior_statement', 'format_behavior_program',
'execute_effect_statements', 'execute_behavior_effect_advanced', 'execute_effect_applier'
]
[docs]
class ApplicableBehaviorItem(NamedTuple):
"""A behavior item that is applicable to the given goal."""
behavior: CrowBehavior
"""The behavior that is applicable."""
bounded_variables: Union[Dict[str, Union[Variable, ObjectConstant, TensorValue]], Dict[str, ObjectOrValueOutputExpression]]
"""The bounded variables that are used to instantiate the behavior. If deferred_execution is True, the values are ObjectOrValueOutputExpression, otherwise Variable or ObjectConstant."""
defered_execution: bool = False
"""Whether the behavior should be executed in deferred mode."""
[docs]
def match_applicable_behaviors(
domain: CrowDomain, state: CrowState, goal: ValueOutputExpression, goal_scope: Dict[str, Any],
return_all_candidates: bool = True
) -> List[ApplicableBehaviorItem]:
candidate_behaviors = list()
for behavior in domain.behaviors.values():
goal_expr = behavior.goal
if is_null_expression(goal_expr):
continue
if (variable_binding := surface_fol_downcast(goal_expr, goal)) is None:
continue
# TODO(Jiayuan Mao @ 2024/07/18): think about if this "promoted" variable binding is correct.
# My current intuition is that, if a variable got changed to a different value later, this procedure will break...
for variable, value in variable_binding.items():
if isinstance(value, Variable):
# assert value.name in goal_scope, f"Variable {value.name} not found in goal scope."
if value.name in goal_scope:
variable_binding[variable] = goal_scope[value.name]
candidate_behaviors.append(ApplicableBehaviorItem(behavior, variable_binding))
# TODO: implement return_all_candidates
return candidate_behaviors
[docs]
def match_policy_applicable_behaviors(
domain: CrowDomain, state: CrowState, goal: ValueOutputExpression, goal_scope: Dict[str, Any],
return_all_candidates: bool = True,
pachieve_kwargs: Dict[str, Any] = None,
) -> List[ApplicableBehaviorItem]:
candidate_behaviors = match_applicable_behaviors(domain, state, goal, goal_scope, return_all_candidates)
if len(candidate_behaviors) > 0:
return candidate_behaviors
free_variables = find_free_variables(goal)
bounded_variables = {var.name: var for var in free_variables}
if isinstance(goal, BoolExpression) and goal.bool_op is BoolOpType.AND:
subgoals = [CrowAchieveExpression(subgoal, once=False, is_policy_achieve=True, **pachieve_kwargs) for subgoal in goal.arguments]
if pachieve_kwargs['ordered']:
goal_set = CrowBehaviorOrderingSuite.make_sequential(subgoals)
else:
goal_set = CrowBehaviorOrderingSuite.make_unordered(subgoals)
if pachieve_kwargs['serializable']:
program = CrowBehaviorOrderingSuite.make_sequential(goal_set)
else:
program = CrowBehaviorOrderingSuite.make_sequential(CrowBehaviorOrderingSuite.make_promotable(goal_set), _skip_simplify=True)
return [ApplicableBehaviorItem(CrowBehavior('__pachive__', free_variables, None, program), bounded_variables)]
elif isinstance(goal, BoolExpression) and goal.bool_op is BoolOpType.OR:
subgoals = [CrowAchieveExpression(subgoal, once=False, is_policy_achieve=True, **pachieve_kwargs) for subgoal in goal.arguments]
if pachieve_kwargs['serializable']:
subgoals = [CrowBehaviorOrderingSuite.make_sequential(subgoal) for subgoal in subgoals]
else:
subgoals = [CrowBehaviorOrderingSuite.make_sequential(CrowBehaviorOrderingSuite.make_promotable(subgoal), _skip_simplify=True) for subgoal in subgoals]
return [ApplicableBehaviorItem(CrowBehavior('__pachive__', free_variables, None, program), bounded_variables) for program in subgoals]
else:
raise ValueError(f"Goal type {goal} is not supported.")
[docs]
def crow_replace_expression_variables_ext(
expr: Union[Expression, CrowBindExpression, CrowAssertExpression, CrowRuntimeAssignmentExpression, CrowFeatureAssignmentExpression, CrowControllerApplicationExpression, CrowAchieveExpression, CrowUntrackExpression, CrowBehaviorApplicationExpression, CrowBehaviorEffectApplicationExpression],
mappings: Optional[Dict[Union[FunctionApplicationExpression, VariableExpression], Union[Variable, ValueOutputExpression]]] = None,
ctx: Optional[ExpressionDefinitionContext] = None,
) -> Union[ObjectOrValueOutputExpression, VariableAssignmentExpression, CrowBindExpression, CrowAssertExpression, CrowRuntimeAssignmentExpression, CrowFeatureAssignmentExpression, CrowControllerApplicationExpression, CrowAchieveExpression, CrowUntrackExpression, CrowBehaviorApplicationExpression, CrowBehaviorEffectApplicationExpression]:
if isinstance(expr, Expression):
return crow_replace_expression_variables(expr, mappings, ctx)
if isinstance(expr, CrowBindExpression):
return CrowBindExpression(expr.variables, crow_replace_expression_variables(expr.goal, mappings, ctx))
if isinstance(expr, CrowAssertExpression):
return CrowAssertExpression(crow_replace_expression_variables(expr.bool_expr, mappings, ctx), once=expr.once)
if isinstance(expr, CrowRuntimeAssignmentExpression):
return CrowRuntimeAssignmentExpression(expr.variable, crow_replace_expression_variables(expr.value, mappings, ctx))
if isinstance(expr, CrowFeatureAssignmentExpression):
return CrowFeatureAssignmentExpression(crow_replace_expression_variables(expr.feature, mappings, ctx), crow_replace_expression_variables(expr.value, mappings, ctx))
if isinstance(expr, CrowControllerApplicationExpression):
return CrowControllerApplicationExpression(expr.controller, [crow_replace_expression_variables(arg, mappings, ctx) for arg in expr.arguments])
if isinstance(expr, CrowAchieveExpression):
return CrowAchieveExpression(crow_replace_expression_variables(expr.goal, mappings, ctx), once=expr.once)
if isinstance(expr, CrowUntrackExpression):
return CrowUntrackExpression(crow_replace_expression_variables(expr.goal, mappings, ctx))
if isinstance(expr, CrowBehaviorApplicationExpression):
return CrowBehaviorApplicationExpression(expr.behavior, [crow_replace_expression_variables(arg, mappings, ctx) for arg in expr.arguments])
if isinstance(expr, CrowBehaviorEffectApplicationExpression):
return CrowBehaviorEffectApplicationExpression(expr.behavior, [crow_replace_expression_variables(arg, mappings, ctx) for arg in expr.arguments])
raise ValueError(f'Invalid expression ({type(expr)}): {expr}')
[docs]
def execute_effect_statements(
executor: CrowExecutor, statements: Sequence[Union[CrowFeatureAssignmentExpression, CrowBehaviorForeachLoopSuite, CrowBehaviorConditionSuite]],
state: CrowState, csp: Optional[ConstraintSatisfactionProblem] = None,
scope: Optional[dict] = None, action_index: Optional[int] = None
):
if scope is None:
scope = dict()
for stmt in statements:
if isinstance(stmt, CrowFeatureAssignmentExpression):
with executor.update_effect_mode(stmt.evaluation_mode, action_index=action_index):
executor.execute(AssignExpression(stmt.feature, stmt.value), state=state, csp=csp, bounded_variables=scope)
elif isinstance(stmt, CrowBehaviorForeachLoopSuite) and stmt.is_foreach_in_expression:
var = stmt.variable
values = executor.execute(stmt.loop_in_expression, state=state, csp=csp, bounded_variables=scope).values
for value in values:
new_scope = scope.copy()
new_scope[var.name] = value
execute_effect_statements(executor, stmt.statements, state, csp=csp, scope=new_scope)
elif isinstance(stmt, CrowBehaviorForeachLoopSuite):
var = stmt.variable
new_scope = scope.copy()
for i, name in enumerate(state.object_type2name[var.typename]):
new_scope[var.name] = ObjectConstant(name, var.dtype)
execute_effect_statements(executor, stmt.statements, state, csp=csp, scope=new_scope)
elif isinstance(stmt, CrowBehaviorConditionSuite):
rv = executor.execute(stmt.condition, state=state, bounded_variables=scope).item()
if isinstance(rv, OptimisticValue):
raise NotImplementedError('OptimisticValue is not supported in the current implementation.')
if bool(rv):
execute_effect_statements(executor, stmt.statements, state, csp=csp, scope=scope)
else:
if stmt.else_statements is not None:
execute_effect_statements(executor, stmt.else_statements, state, csp=csp, scope=scope)
else:
raise ValueError(f'Unsupported statement type: {type(stmt)}')
[docs]
def execute_behavior_effect_advanced(executor: CrowExecutor, behavior: Union[CrowBehavior, CrowController], state: CrowState, scope: dict, csp: Optional[ConstraintSatisfactionProblem] = None, action_index: Optional[int] = None) -> CrowState:
"""Execute a behavior effect with for-loops and conditional statements."""
new_state = state.clone()
execute_effect_statements(executor, behavior.effect_body.statements, new_state, csp=csp, scope=scope, action_index=action_index)
return new_state
[docs]
def execute_effect_applier(executor: CrowExecutor, applier: CrowEffectApplier, state: CrowState, csp: Optional[ConstraintSatisfactionProblem] = None, action_index: Optional[int] = None) -> CrowState:
return execute_effect_statements(executor, applier.statements, state, csp=csp, scope=applier.bounded_variables, action_index=action_index)