#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File : regression_planning.py
# Author : Jiayuan Mao
# Email : maojiayuan@gmail.com
# Date : 03/17/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.
from dataclasses import dataclass
from typing import Any, Optional, Union, Tuple, List, Dict, NamedTuple, Type
from concepts.dsl.dsl_types import ObjectConstant
from concepts.dsl.constraint import ConstraintSatisfactionProblem, OptimisticValue
from concepts.dsl.expression import ValueOutputExpression, is_and_expr, ObjectOrValueOutputExpression, VariableAssignmentExpression
from concepts.dsl.tensor_value import TensorValue
from concepts.dsl.tensor_state import StateObjectReference, StateObjectList
from concepts.dm.crow.crow_domain import CrowDomain, CrowProblem, CrowState
from concepts.dm.crow.controller import CrowControllerApplier, CrowControllerApplicationExpression
from concepts.dm.crow.behavior import CrowBehavior
from concepts.dm.crow.behavior import CrowBindExpression, CrowAssertExpression, CrowRuntimeAssignmentExpression, CrowAchieveExpression, CrowUntrackExpression, CrowBehaviorApplicationExpression, CrowBehaviorEffectApplicationExpression
from concepts.dm.crow.behavior import CrowBehaviorForeachLoopSuite, CrowBehaviorWhileLoopSuite, CrowBehaviorConditionSuite
from concepts.dm.crow.behavior import CrowBehaviorOrderingSuite, CrowBehaviorCommit
from concepts.dm.crow.executors.crow_executor import CrowExecutor
from concepts.dm.crow.interfaces.controller_interface import CrowSimulationControllerInterface
__all__ = ['SupportedCrowExpressionType', 'ScopedCrowExpression', 'CrowPlanningResult', 'CrowRegressionPlanner', 'crow_regression', 'get_crow_regression_algorithm', 'set_crow_regression_algorithm']
SupportedCrowExpressionType = Union[
CrowBehaviorOrderingSuite, CrowBehaviorForeachLoopSuite, CrowBehaviorWhileLoopSuite, CrowBehaviorConditionSuite,
CrowBindExpression, CrowAssertExpression, CrowRuntimeAssignmentExpression, CrowRuntimeAssignmentExpression,
CrowControllerApplicationExpression, CrowAchieveExpression, CrowUntrackExpression,
CrowBehaviorApplicationExpression, CrowBehaviorEffectApplicationExpression,
CrowBehaviorCommit
]
[docs]
@dataclass(frozen=True)
class ScopedCrowExpression(object):
"""A statement in the right stack of the planning state."""
statement: SupportedCrowExpressionType
"""The statement."""
scope_id: int
"""The scope id of the statement."""
_UNSET = object()
[docs]
@dataclass(frozen=True)
class CrowPlanningResult(object):
state: CrowState
csp: Optional[ConstraintSatisfactionProblem]
controller_actions: Tuple[CrowControllerApplier, ...]
scopes: Dict[int, Any]
latest_scope: int
[docs]
@classmethod
def make_empty(cls, state: CrowState) -> 'CrowPlanningResult':
return cls(state, ConstraintSatisfactionProblem(), tuple(), dict(), 0)
[docs]
def clone(self, state=_UNSET, csp=_UNSET, controller_actions=_UNSET, scopes=_UNSET, latest_scope=_UNSET) -> 'CrowPlanningResult':
return CrowPlanningResult(
state=state if state is not _UNSET else self.state,
csp=csp if csp is not _UNSET else self.csp,
controller_actions=controller_actions if controller_actions is not _UNSET else self.controller_actions,
scopes=scopes if scopes is not _UNSET else self.scopes,
latest_scope=latest_scope if latest_scope is not _UNSET else self.latest_scope
)
[docs]
class CrowRegressionPlanner(object):
[docs]
def __init__(
self, executor: CrowExecutor, state: CrowState, goal_expr: Union[str, ValueOutputExpression], *,
simulation_interface: Optional[CrowSimulationControllerInterface] = None,
enable_reordering: bool = True,
max_search_depth: int = 20,
max_beam_size: int = 20,
# Group 1: goal serialization and refinements.
is_goal_ordered: bool = True,
is_goal_serializable: bool = True,
is_goal_refinement_compressible: bool = True,
# Group 2: CSP solver.
enable_csp: bool = True,
max_csp_trials: int = 10,
max_global_csp_trials: int = 100,
max_csp_branching_factor: int = 5,
use_generator_manager: bool = False,
store_generator_manager_history: bool = False,
# Group 3: output format.
include_effect_appliers: bool = False,
include_dependency_trace: bool = False,
verbose: bool = True
):
"""Initialize the planner.
Args:
executor: the executor.
state: the initial state.
goal_expr: the goal expression.
simulation_interface: the simulation interface.
enable_reordering: whether to enable reordering.
max_search_depth: the maximum search depth.
max_beam_size: the maximum beam size.
is_goal_ordered: whether the goal is ordered.
is_goal_serializable: whether the goal is serializable.
is_goal_refinement_compressible: whether the goal refinement is compressible.
enable_csp: whether to enable the CSP solver.
max_csp_trials: the maximum number of CSP trials.
max_global_csp_trials: the maximum number of global CSP trials.
max_csp_branching_factor: the maximum CSP branching factor.
use_generator_manager: whether to use the generator manager.
store_generator_manager_history: whether to store the generator manager history.
include_effect_appliers: whether to include the effect appliers in the search result.
The effect appliers are of type :class:`~concepts.dm.crow.behavior.CrowEffectApplier`.
include_dependency_trace: whether to include the dependency graph in the search result.
verbose: whether to output verbose information.
"""
self.executor = executor
self.state = state
self.goal_expr = goal_expr
if isinstance(self.goal_expr, str):
self.goal_expr = executor.parse(self.goal_expr, state=state)
self.simulation_interface = simulation_interface
self.max_search_depth = max_search_depth
self.max_beam_size = max_beam_size
self.enable_reordering = enable_reordering
self.is_goal_ordered = is_goal_ordered
self.is_goal_serializable = is_goal_serializable
self.is_goal_refinement_compressible = is_goal_refinement_compressible
self.enable_csp = enable_csp
self.max_csp_trials = max_csp_trials
self.max_global_csp_trials = max_global_csp_trials
self.max_csp_branching_factor = max_csp_branching_factor
self.use_generator_manager = use_generator_manager
self.store_generator_manager_history = store_generator_manager_history
self.include_effect_appliers = include_effect_appliers
self.include_dependency_trace = include_dependency_trace
self.verbose = verbose
self._search_cache = dict()
self._search_stat = {'nr_expanded_nodes': 0}
self._results = list()
self._post_init()
@property
def domain(self) -> CrowDomain:
return self.executor.domain
def _post_init(self) -> None:
pass
def _make_goal_program(self):
if self.domain.has_behavior('__goal__'):
return self.domain.get_behavior('__goal__').body
if is_and_expr(self.goal_expr):
if len(self.goal_expr.arguments) == 1 and self.goal_expr.arguments[0].return_type.is_list_type:
goal_set = [self.goal_expr]
else:
goal_set = list(self.goal_expr.arguments)
else:
goal_set = [self.goal_expr]
goal_set = [CrowAchieveExpression(x) for x in goal_set]
assert_expr = CrowAssertExpression(self.goal_expr)
if self.is_goal_ordered:
goal_set = CrowBehaviorOrderingSuite.make_sequential(goal_set, variable_scope_identifier=0)
else:
goal_set = CrowBehaviorOrderingSuite.make_unordered(goal_set, variable_scope_identifier=0)
if self.is_goal_serializable:
program = CrowBehaviorOrderingSuite.make_sequential(
goal_set,
assert_expr,
CrowBehaviorCommit(csp=True, sketch=True, behavior=True),
variable_scope_identifier=0
)
else:
program = CrowBehaviorOrderingSuite.make_sequential(
CrowBehaviorOrderingSuite.make_promotable(goal_set),
assert_expr,
CrowBehaviorCommit(csp=True, sketch=True, behavior=True),
variable_scope_identifier=0
)
return program
@property
def search_stat(self) -> dict:
return self._search_stat
@property
def results(self) -> List[CrowPlanningResult]:
return self._results
[docs]
def set_results(self, results: List[CrowPlanningResult]) -> None:
self._results = results
[docs]
def main(self) -> Tuple[List[Tuple[CrowControllerApplier, ...]], dict]:
program = self._make_goal_program()
behavior_application = CrowBehaviorApplicationExpression(
CrowBehavior('__goal__', [], None, program, [], CrowBehaviorOrderingSuite.make_sequential()),
[]
)
program = CrowBehaviorOrderingSuite.make_sequential(behavior_application, variable_scope_identifier=0)
candidate_plans = self.main_entry(program)
return candidate_plans, self._search_stat
[docs]
def evaluate(
self, expression: Union[ObjectOrValueOutputExpression, VariableAssignmentExpression], state: CrowState, csp: Optional[ConstraintSatisfactionProblem] = None,
bounded_variables: Optional[Dict[str, Union[TensorValue, ObjectConstant]]] = None,
clone_csp: bool = True,
force_tensor_value: bool = False
) -> Tuple[Union[None, StateObjectReference, StateObjectList, TensorValue, OptimisticValue], Optional[ConstraintSatisfactionProblem]]:
"""Evaluate an expression and return the result.
Args:
expression: the expression to evaluate.
state: the current state.
csp: the current CSP.
bounded_variables: the bounded variables.
clone_csp: whether to clone the CSP.
force_tensor_value: whether to force the result to be a tensor value.
Returns:
the evaluation result and the updated CSP.
"""
if clone_csp:
csp = csp.clone() if csp is not None else None
if isinstance(expression, VariableAssignmentExpression):
self.executor.execute(expression, state=state, csp=csp, bounded_variables=bounded_variables)
return None, csp
rv = self.executor.execute(expression, state=state, csp=csp, bounded_variables=bounded_variables)
if isinstance(rv, TensorValue):
if force_tensor_value:
return rv, csp
if rv.is_scalar:
return rv.item(), csp
return rv, csp
assert isinstance(rv, StateObjectReference) or isinstance(rv, StateObjectList)
return rv, csp
[docs]
def main_entry(self, state: CrowBehaviorOrderingSuite) -> List[Tuple[CrowControllerApplier, ...]]:
raise NotImplementedError()
g_crow_regression_algorithm = 'iddfs_v1'
[docs]
def get_crow_regression_algorithm() -> Type[CrowRegressionPlanner]:
if g_crow_regression_algorithm == 'dfs_v1':
from concepts.dm.crow.planners.regression_planning_impl.crow_regression_planner_v1 import CrowRegressionPlannerDFSv1
return CrowRegressionPlannerDFSv1
if g_crow_regression_algorithm == 'bfs_v1':
from concepts.dm.crow.planners.regression_planning_impl.crow_regression_planner_v1 import CrowRegressionPlannerBFSv1
return CrowRegressionPlannerBFSv1
if g_crow_regression_algorithm == 'dfs_v2':
from concepts.dm.crow.planners.regression_planning_impl.crow_regression_planner_dfs_v2 import CrowRegressionPlannerDFSv2
return CrowRegressionPlannerDFSv2
if g_crow_regression_algorithm == 'astar_v2':
from concepts.dm.crow.planners.regression_planning_impl.crow_regression_planner_astar_v2 import CrowRegressionPlannerAStarv2
return CrowRegressionPlannerAStarv2
if g_crow_regression_algorithm == 'iddfs_v1':
from concepts.dm.crow.planners.regression_planning_impl.crow_regression_planner_iddfs_v1 import CrowRegressionPlannerIDDFSv1
return CrowRegressionPlannerIDDFSv1
raise ValueError(f'Unknown regression algorithm: {g_crow_regression_algorithm}')
[docs]
def set_crow_regression_algorithm(algorithm: str) -> None:
global g_crow_regression_algorithm
if algorithm not in {'dfs_v1', 'bfs_v1', 'dfs_v2', 'astar_v2'}:
raise ValueError(f'Unknown regression algorithm: {algorithm}')
g_crow_regression_algorithm = algorithm
[docs]
def crow_regression(domain_or_executor: Union[CrowExecutor, CrowDomain], problem: CrowProblem, goal: Optional[Union[str, ValueOutputExpression]] = None, return_planner: bool = False, **kwargs) -> Union[Tuple[list, dict], CrowRegressionPlanner]:
if isinstance(domain_or_executor, CrowExecutor):
executor = domain_or_executor
elif isinstance(domain_or_executor, CrowDomain):
executor = domain_or_executor.make_executor()
else:
raise ValueError(f'Unknown domain or executor: {domain_or_executor}')
algo = get_crow_regression_algorithm()
planner = algo(executor, problem.state, goal if goal is not None else problem.goal, **kwargs)
# planner.set_human_control_interface(True)
if return_planner:
return planner
return planner.main()