Source code for concepts.dm.crow.behavior_utils

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : behavior_utils.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 03/17/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.

from typing import Any, Optional, Union, Sequence, NamedTuple, List, Dict
from jacinle.utils.printing import indent_text

from concepts.dsl.dsl_types import AutoType, Variable, ObjectConstant
from concepts.dsl.constraint import ConstraintSatisfactionProblem, OptimisticValue
from concepts.dsl.tensor_value import TensorValue
from concepts.dsl.expression import Expression, ExpressionDefinitionContext, ValueOutputExpression, ObjectOrValueOutputExpression, FunctionApplicationExpression, VariableExpression, VariableAssignmentExpression, ObjectConstantExpression, is_null_expression
from concepts.dsl.expression import AssignExpression, BoolExpression, BoolOpType
from concepts.dsl.expression_utils import surface_fol_downcast, find_free_variables
from concepts.dm.crow.controller import CrowController, CrowControllerApplicationExpression
from concepts.dm.crow.crow_expression_utils import crow_replace_expression_variables
from concepts.dm.crow.executors.crow_executor import CrowExecutor
from concepts.dm.crow.behavior import CrowBehavior, CrowBindExpression, CrowAssertExpression, CrowRuntimeAssignmentExpression, CrowFeatureAssignmentExpression, CrowAchieveExpression, CrowUntrackExpression, CrowBehaviorApplicationExpression, CrowBehaviorEffectApplicationExpression, CrowBehaviorOrderingSuite, CrowBehaviorCommit
from concepts.dm.crow.behavior import CrowBehaviorForeachLoopSuite, CrowBehaviorWhileLoopSuite, CrowBehaviorConditionSuite, CrowEffectApplier
from concepts.dm.crow.crow_domain import CrowDomain, CrowState

__all__ = [
    'ApplicableBehaviorItem', 'match_applicable_behaviors', 'match_policy_applicable_behaviors',
    'crow_replace_expression_variables_ext', 'format_behavior_statement', 'format_behavior_program',
    'execute_effect_statements', 'execute_behavior_effect_advanced', 'execute_effect_applier'
]


[docs] class ApplicableBehaviorItem(NamedTuple): """A behavior item that is applicable to the given goal.""" behavior: CrowBehavior """The behavior that is applicable.""" bounded_variables: Union[Dict[str, Union[Variable, ObjectConstant, TensorValue]], Dict[str, ObjectOrValueOutputExpression]] """The bounded variables that are used to instantiate the behavior. If deferred_execution is True, the values are ObjectOrValueOutputExpression, otherwise Variable or ObjectConstant.""" defered_execution: bool = False """Whether the behavior should be executed in deferred mode."""
[docs] def match_applicable_behaviors( domain: CrowDomain, state: CrowState, goal: ValueOutputExpression, goal_scope: Dict[str, Any], return_all_candidates: bool = True ) -> List[ApplicableBehaviorItem]: candidate_behaviors = list() for behavior in domain.behaviors.values(): goal_expr = behavior.goal if is_null_expression(goal_expr): continue if (variable_binding := surface_fol_downcast(goal_expr, goal)) is None: continue # TODO(Jiayuan Mao @ 2024/07/18): think about if this "promoted" variable binding is correct. # My current intuition is that, if a variable got changed to a different value later, this procedure will break... for variable, value in variable_binding.items(): if isinstance(value, Variable): # assert value.name in goal_scope, f"Variable {value.name} not found in goal scope." if value.name in goal_scope: variable_binding[variable] = goal_scope[value.name] candidate_behaviors.append(ApplicableBehaviorItem(behavior, variable_binding)) # TODO: implement return_all_candidates return candidate_behaviors
[docs] def match_policy_applicable_behaviors( domain: CrowDomain, state: CrowState, goal: ValueOutputExpression, goal_scope: Dict[str, Any], return_all_candidates: bool = True, pachieve_kwargs: Dict[str, Any] = None, ) -> List[ApplicableBehaviorItem]: candidate_behaviors = match_applicable_behaviors(domain, state, goal, goal_scope, return_all_candidates) if len(candidate_behaviors) > 0: return candidate_behaviors free_variables = find_free_variables(goal) bounded_variables = {var.name: var for var in free_variables} if isinstance(goal, BoolExpression) and goal.bool_op is BoolOpType.AND: subgoals = [CrowAchieveExpression(subgoal, once=False, is_policy_achieve=True, **pachieve_kwargs) for subgoal in goal.arguments] if pachieve_kwargs['ordered']: goal_set = CrowBehaviorOrderingSuite.make_sequential(subgoals) else: goal_set = CrowBehaviorOrderingSuite.make_unordered(subgoals) if pachieve_kwargs['serializable']: program = CrowBehaviorOrderingSuite.make_sequential(goal_set) else: program = CrowBehaviorOrderingSuite.make_sequential(CrowBehaviorOrderingSuite.make_promotable(goal_set), _skip_simplify=True) return [ApplicableBehaviorItem(CrowBehavior('__pachive__', free_variables, None, program), bounded_variables)] elif isinstance(goal, BoolExpression) and goal.bool_op is BoolOpType.OR: subgoals = [CrowAchieveExpression(subgoal, once=False, is_policy_achieve=True, **pachieve_kwargs) for subgoal in goal.arguments] if pachieve_kwargs['serializable']: subgoals = [CrowBehaviorOrderingSuite.make_sequential(subgoal) for subgoal in subgoals] else: subgoals = [CrowBehaviorOrderingSuite.make_sequential(CrowBehaviorOrderingSuite.make_promotable(subgoal), _skip_simplify=True) for subgoal in subgoals] return [ApplicableBehaviorItem(CrowBehavior('__pachive__', free_variables, None, program), bounded_variables) for program in subgoals] else: raise ValueError(f"Goal type {goal} is not supported.")
[docs] def crow_replace_expression_variables_ext( expr: Union[Expression, CrowBindExpression, CrowAssertExpression, CrowRuntimeAssignmentExpression, CrowFeatureAssignmentExpression, CrowControllerApplicationExpression, CrowAchieveExpression, CrowUntrackExpression, CrowBehaviorApplicationExpression, CrowBehaviorEffectApplicationExpression], mappings: Optional[Dict[Union[FunctionApplicationExpression, VariableExpression], Union[Variable, ValueOutputExpression]]] = None, ctx: Optional[ExpressionDefinitionContext] = None, ) -> Union[ObjectOrValueOutputExpression, VariableAssignmentExpression, CrowBindExpression, CrowAssertExpression, CrowRuntimeAssignmentExpression, CrowFeatureAssignmentExpression, CrowControllerApplicationExpression, CrowAchieveExpression, CrowUntrackExpression, CrowBehaviorApplicationExpression, CrowBehaviorEffectApplicationExpression]: if isinstance(expr, Expression): return crow_replace_expression_variables(expr, mappings, ctx) if isinstance(expr, CrowBindExpression): return CrowBindExpression(expr.variables, crow_replace_expression_variables(expr.goal, mappings, ctx)) if isinstance(expr, CrowAssertExpression): return CrowAssertExpression(crow_replace_expression_variables(expr.bool_expr, mappings, ctx), once=expr.once) if isinstance(expr, CrowRuntimeAssignmentExpression): return CrowRuntimeAssignmentExpression(expr.variable, crow_replace_expression_variables(expr.value, mappings, ctx)) if isinstance(expr, CrowFeatureAssignmentExpression): return CrowFeatureAssignmentExpression(crow_replace_expression_variables(expr.feature, mappings, ctx), crow_replace_expression_variables(expr.value, mappings, ctx)) if isinstance(expr, CrowControllerApplicationExpression): return CrowControllerApplicationExpression(expr.controller, [crow_replace_expression_variables(arg, mappings, ctx) for arg in expr.arguments]) if isinstance(expr, CrowAchieveExpression): return CrowAchieveExpression(crow_replace_expression_variables(expr.goal, mappings, ctx), once=expr.once) if isinstance(expr, CrowUntrackExpression): return CrowUntrackExpression(crow_replace_expression_variables(expr.goal, mappings, ctx)) if isinstance(expr, CrowBehaviorApplicationExpression): return CrowBehaviorApplicationExpression(expr.behavior, [crow_replace_expression_variables(arg, mappings, ctx) for arg in expr.arguments]) if isinstance(expr, CrowBehaviorEffectApplicationExpression): return CrowBehaviorEffectApplicationExpression(expr.behavior, [crow_replace_expression_variables(arg, mappings, ctx) for arg in expr.arguments]) raise ValueError(f'Invalid expression ({type(expr)}): {expr}')
[docs] def format_behavior_statement( program: Union[ Expression, CrowBehaviorOrderingSuite, CrowBindExpression, CrowAssertExpression, CrowControllerApplicationExpression, CrowFeatureAssignmentExpression, CrowAchieveExpression, CrowUntrackExpression, CrowBehaviorEffectApplicationExpression, CrowBehaviorApplicationExpression ], scopes: Optional[dict], scope_id: Optional[int] = None ) -> str: if isinstance(program, CrowBehaviorOrderingSuite): return format_behavior_program(program, scopes) if scopes is None and scope_id is not None: if isinstance(program, Expression): return str(program) + f'@({scope_id})' if isinstance(program, CrowBindExpression): return f'bind@{scope_id} {", ".join(str(var) for var in program.variables)} <- {crow_replace_expression_variables(program.goal)}' if isinstance(program, CrowAssertExpression): return f'{program.op_str}@{scope_id} {str(program.bool_expr)}' if isinstance(program, CrowRuntimeAssignmentExpression): return f'assign@{scope_id} {program.variable} <- {program.value}' if isinstance(program, CrowControllerApplicationExpression): return f'do@{scope_id} {str(program)}' if isinstance(program, CrowFeatureAssignmentExpression): return f'assign_feature@{scope_id} {program.feature} <- {program.value}' if isinstance(program, CrowAchieveExpression): return f'{program.op_str}@{scope_id} {str(program.goal)}' if isinstance(program, CrowUntrackExpression): return f'untrack@{scope_id} {str(program.goal)}' if isinstance(program, CrowBehaviorApplicationExpression): return f'apply@{scope_id} {str(program)}' if isinstance(program, CrowBehaviorEffectApplicationExpression): return f'effect@{scope_id} {str(program)}' if isinstance(program, CrowBehaviorCommit): return f'commit' if isinstance(program, CrowBehaviorForeachLoopSuite): body_str = '\n'.join(format_behavior_statement(stmt, scopes, scope_id) for stmt in program.statements) return f'foreach@{scope_id} {program.variable} {{\n{body_str}\n}}' if isinstance(program, CrowBehaviorWhileLoopSuite): body_str = '\n'.join(format_behavior_statement(stmt, scopes, scope_id) for stmt in program.statements) return f'while@{scope_id} {str(program.condition)} {{\n{body_str}\n}}' if isinstance(program, CrowBehaviorConditionSuite): if program.else_statements is None: body_str = '\n'.join(format_behavior_statement(stmt, scopes, scope_id) for stmt in program.statements) return f'if@{scope_id} {str(program.condition)} {{\n{body_str}\n}}' body_str = '\n'.join(format_behavior_statement(stmt, scopes, scope_id) for stmt in program.statements) else_body_str = '\n'.join(format_behavior_statement(stmt, scopes, scope_id) for stmt in program.else_statements) return f'if@{scope_id} {str(program.condition)} {{\n{body_str}\n}} else {{\n{else_body_str}\n}}' raise ValueError(f'Invalid program ({type(program)}): {program}') scope = None if scope_id is not None and scopes is not None: scope = scopes.get(scope_id, None) scope = {VariableExpression(Variable(name, AutoType)): ObjectConstantExpression(value) for name, value in scope.items()} if isinstance(program, Expression): if scope is None: return str(program) return str(crow_replace_expression_variables(program, scope)) if isinstance(program, CrowBindExpression): return f'bind {", ".join(str(var) for var in program.variables)} <- {crow_replace_expression_variables(program.goal, scope)}' if isinstance(program, CrowAssertExpression): return f'{program.op_str} {crow_replace_expression_variables(program.bool_expr, scope)}' if isinstance(program, CrowRuntimeAssignmentExpression): return f'assign {program.variable} <- {crow_replace_expression_variables(program.value, scope)}' if isinstance(program, CrowControllerApplicationExpression): return f'do {crow_replace_expression_variables_ext(program, scope)}' if isinstance(program, CrowFeatureAssignmentExpression): return f'assign_feature {crow_replace_expression_variables(program.feature, scope)} <- {crow_replace_expression_variables(program.value, scope)}' if isinstance(program, CrowAchieveExpression): return f'{program.op_str} {crow_replace_expression_variables(program.goal, scope)}' if isinstance(program, CrowUntrackExpression): return f'untrack {crow_replace_expression_variables(program.goal, scope)}' if isinstance(program, CrowBehaviorApplicationExpression): return f'apply {crow_replace_expression_variables_ext(program, scope)}' if isinstance(program, CrowBehaviorEffectApplicationExpression): return f'effect {crow_replace_expression_variables_ext(program, scope)}' if isinstance(program, CrowBehaviorCommit): return f'commit' if isinstance(program, CrowBehaviorForeachLoopSuite): body_str = indent_text('\n'.join(format_behavior_statement(stmt, scopes, scope_id) for stmt in program.statements)) return f'foreach {program.variable} {{\n{body_str}\n}}' if isinstance(program, CrowBehaviorConditionSuite): if program.else_statements is None: body_str = indent_text('\n'.join(format_behavior_statement(stmt, scopes, scope_id) for stmt in program.statements)) return f'if {crow_replace_expression_variables(program.condition, scope)} {{\n{body_str}\n}}' body_str = indent_text('\n'.join(format_behavior_statement(stmt, scopes, scope_id) for stmt in program.statements)) else_body_str = indent_text('\n'.join(format_behavior_statement(stmt, scopes, scope_id) for stmt in program.else_statements)) return f'if {crow_replace_expression_variables(program.condition, scope)} {{\n{body_str}\n}} else {{\n{else_body_str}\n}}' raise ValueError(f'Invalid program ({type(program)}): {program}')
[docs] def format_behavior_program(program: CrowBehaviorOrderingSuite, scopes: Optional[dict], flatten=False) -> str: if flatten: unorderd_statements = list() for stmt, scope_id in program.iter_statements_with_scope(): unorderd_statements.append(format_behavior_statement(stmt, scopes, scope_id)) unorderd_statements.sort() return 'unordered{\n' + indent_text('\n'.join(unorderd_statements)) + '\n}' fmt = f'{program.order.value}{{\n' for stmt in program.statements: if isinstance(stmt, CrowBehaviorOrderingSuite): fmt += indent_text(f'{format_behavior_program(stmt, scopes)}') + '\n' else: fmt += indent_text(format_behavior_statement(stmt, scopes, program.variable_scope_identifier)) + '\n' fmt += '}' return fmt
[docs] def execute_effect_statements( executor: CrowExecutor, statements: Sequence[Union[CrowFeatureAssignmentExpression, CrowBehaviorForeachLoopSuite, CrowBehaviorConditionSuite]], state: CrowState, csp: Optional[ConstraintSatisfactionProblem] = None, scope: Optional[dict] = None, action_index: Optional[int] = None ): if scope is None: scope = dict() for stmt in statements: if isinstance(stmt, CrowFeatureAssignmentExpression): with executor.update_effect_mode(stmt.evaluation_mode, action_index=action_index): executor.execute(AssignExpression(stmt.feature, stmt.value), state=state, csp=csp, bounded_variables=scope) elif isinstance(stmt, CrowBehaviorForeachLoopSuite) and stmt.is_foreach_in_expression: var = stmt.variable values = executor.execute(stmt.loop_in_expression, state=state, csp=csp, bounded_variables=scope).values for value in values: new_scope = scope.copy() new_scope[var.name] = value execute_effect_statements(executor, stmt.statements, state, csp=csp, scope=new_scope) elif isinstance(stmt, CrowBehaviorForeachLoopSuite): var = stmt.variable new_scope = scope.copy() for i, name in enumerate(state.object_type2name[var.typename]): new_scope[var.name] = ObjectConstant(name, var.dtype) execute_effect_statements(executor, stmt.statements, state, csp=csp, scope=new_scope) elif isinstance(stmt, CrowBehaviorConditionSuite): rv = executor.execute(stmt.condition, state=state, bounded_variables=scope).item() if isinstance(rv, OptimisticValue): raise NotImplementedError('OptimisticValue is not supported in the current implementation.') if bool(rv): execute_effect_statements(executor, stmt.statements, state, csp=csp, scope=scope) else: if stmt.else_statements is not None: execute_effect_statements(executor, stmt.else_statements, state, csp=csp, scope=scope) else: raise ValueError(f'Unsupported statement type: {type(stmt)}')
[docs] def execute_behavior_effect_advanced(executor: CrowExecutor, behavior: Union[CrowBehavior, CrowController], state: CrowState, scope: dict, csp: Optional[ConstraintSatisfactionProblem] = None, action_index: Optional[int] = None) -> CrowState: """Execute a behavior effect with for-loops and conditional statements.""" new_state = state.clone() execute_effect_statements(executor, behavior.effect_body.statements, new_state, csp=csp, scope=scope, action_index=action_index) return new_state
[docs] def execute_effect_applier(executor: CrowExecutor, applier: CrowEffectApplier, state: CrowState, csp: Optional[ConstraintSatisfactionProblem] = None, action_index: Optional[int] = None) -> CrowState: return execute_effect_statements(executor, applier.statements, state, csp=csp, scope=applier.bounded_variables, action_index=action_index)