跳转至

智能体

AgentClass module-attribute

AgentClass = TypeVar('AgentClass', bound=IAgent)

AgentId module-attribute

AgentId: TypeAlias = Union[int, str]

AgentSelectionStrategyName module-attribute

AgentSelectionStrategyName: TypeAlias = Literal['距离近优先', '空闲优先', '自定义']

DEFAULT_PYDANTIC_MODEL_CONFIG module-attribute

DEFAULT_PYDANTIC_MODEL_CONFIG = ConfigDict(strict=False, arbitrary_types_allowed=True, extra='ignore', validate_by_alias=True, validate_by_name=True, coerce_numbers_to_str=True, alias_generator=camelize, populate_by_name=True)

GoHomeStrategyFrequency module-attribute

GoHomeStrategyFrequency: TypeAlias = Literal['每次', '如果无其他任务']

LocationType module-attribute

LocationType = Union[Point, Storable]

Predicate module-attribute

Predicate: TypeAlias = Callable[[T], bool]

T module-attribute

T = TypeVar('T')

TaskType module-attribute

TaskType = TypeVar('TaskType')

AgentIdleStrategy

智能体空闲策略

源代码位于: logis/biz/sim/agent/iface/idle_strategy.py
class AgentIdleStrategy(IExpose):
    """
    智能体空闲策略
    """

    def __init__(self, agent: Optional[AgentClass] = None, **kwargs) -> None:
        """
        初始化智能体空闲策略

        Args:
            agent: 智能体,是否要传取决于此策略是否仅针对此智能体
        """
        super().__init__(**kwargs)
        self.agent = agent

    def is_idle(self, agent: Optional[AgentClass] = None, **kwargs) -> bool:
        """
        判断智能体是否空闲

        Args:
            agent: 如果不传则使用初始化时传的智能体,否则使用传参的智能体
        """
        raise NotImplementedError("is_idle方法尚未实现")

    @abstractmethod
    def on_idle(
        self,
        agent: Optional[AgentClass] = None,
        is_idle: Optional[Predicate[AgentClass]] = None,
        **kwargs,
    ) -> Generator[simpy.Event, Any, None]:
        """
        智能体做完任务之后的回调逻辑

        Args:
            agent: 如果不传则使用初始化时传的智能体,否则使用传参的智能体
            is_idle: 判断智能体是否空闲的方法,如果不传则使用is_idle方法,否则使用传参的方法判断
        """

agent instance-attribute

agent = agent

__init__

__init__(agent: Optional[AgentClass] = None, **kwargs) -> None

初始化智能体空闲策略

参数:

名称 类型 描述 默认
agent Optional[AgentClass]

智能体,是否要传取决于此策略是否仅针对此智能体

None
源代码位于: logis/biz/sim/agent/iface/idle_strategy.py
def __init__(self, agent: Optional[AgentClass] = None, **kwargs) -> None:
    """
    初始化智能体空闲策略

    Args:
        agent: 智能体,是否要传取决于此策略是否仅针对此智能体
    """
    super().__init__(**kwargs)
    self.agent = agent

is_idle

is_idle(agent: Optional[AgentClass] = None, **kwargs) -> bool

判断智能体是否空闲

参数:

名称 类型 描述 默认
agent Optional[AgentClass]

如果不传则使用初始化时传的智能体,否则使用传参的智能体

None
源代码位于: logis/biz/sim/agent/iface/idle_strategy.py
def is_idle(self, agent: Optional[AgentClass] = None, **kwargs) -> bool:
    """
    判断智能体是否空闲

    Args:
        agent: 如果不传则使用初始化时传的智能体,否则使用传参的智能体
    """
    raise NotImplementedError("is_idle方法尚未实现")

on_idle abstractmethod

on_idle(agent: Optional[AgentClass] = None, is_idle: Optional[Predicate[AgentClass]] = None, **kwargs) -> Generator[simpy.Event, Any, None]

智能体做完任务之后的回调逻辑

参数:

名称 类型 描述 默认
agent Optional[AgentClass]

如果不传则使用初始化时传的智能体,否则使用传参的智能体

None
is_idle Optional[Predicate[AgentClass]]

判断智能体是否空闲的方法,如果不传则使用is_idle方法,否则使用传参的方法判断

None
源代码位于: logis/biz/sim/agent/iface/idle_strategy.py
@abstractmethod
def on_idle(
    self,
    agent: Optional[AgentClass] = None,
    is_idle: Optional[Predicate[AgentClass]] = None,
    **kwargs,
) -> Generator[simpy.Event, Any, None]:
    """
    智能体做完任务之后的回调逻辑

    Args:
        agent: 如果不传则使用初始化时传的智能体,否则使用传参的智能体
        is_idle: 判断智能体是否空闲的方法,如果不传则使用is_idle方法,否则使用传参的方法判断
    """

AgentManifest

源代码位于: logis/biz/sim/agent/model/__init__.py
class AgentManifest(BaseModel, Generic[AgentClass, TaskType]):
    agent: Optional[AgentClass] = None
    agent_id: Optional[AgentId] = None

    tasks: Union[Iterable[TaskType], Iterator[TaskType]] = []

    __task_iter__: Optional[Iterator[TaskType]] = None

    def get_task_iter(self, is_new: bool = False):
        if is_new or self.__task_iter__ is None:
            self.__task_iter__ = iter(self.tasks)
        return self.__task_iter__

    model_config = DEFAULT_PYDANTIC_MODEL_CONFIG

__task_iter__ class-attribute instance-attribute

__task_iter__: Optional[Iterator[TaskType]] = None

agent class-attribute instance-attribute

agent: Optional[AgentClass] = None

agent_id class-attribute instance-attribute

agent_id: Optional[AgentId] = None

model_config class-attribute instance-attribute

model_config = DEFAULT_PYDANTIC_MODEL_CONFIG

tasks class-attribute instance-attribute

tasks: Union[Iterable[TaskType], Iterator[TaskType]] = []

get_task_iter

get_task_iter(is_new: bool = False)
源代码位于: logis/biz/sim/agent/model/__init__.py
def get_task_iter(self, is_new: bool = False):
    if is_new or self.__task_iter__ is None:
        self.__task_iter__ = iter(self.tasks)
    return self.__task_iter__

DefaultAgentSelectionStrategy

默认智能体选择策略

源代码位于: logis/biz/sim/agent/iface/agent_apply_strategy.py
class DefaultAgentSelectionStrategy(IAgentSelectionStrategy):
    """
    默认智能体选择策略
    """

    def request(
        self, agent_pool: Optional[IAgentPool] = None, fast_fail: bool = False, **kwargs
    ):
        """
        选择所有智能体
        """
        agent_pool = agent_pool or self.agent_pool
        assert agent_pool, "未指定资源池,无法申请资源"

        if fast_fail and agent_pool.available_quantity <= 0:
            return None

        try:
            req = agent_pool.do_request_resource()
            resource: Optional["IAgent"] = yield req
        except simpy.Interrupt:
            agent_pool.cancel_request_resource(req)
            resource = None

        if resource:
            agent_pool.after_resource_requested(resource=resource, **kwargs)
        return resource

request

request(agent_pool: Optional[IAgentPool] = None, fast_fail: bool = False, **kwargs)

选择所有智能体

源代码位于: logis/biz/sim/agent/iface/agent_apply_strategy.py
def request(
    self, agent_pool: Optional[IAgentPool] = None, fast_fail: bool = False, **kwargs
):
    """
    选择所有智能体
    """
    agent_pool = agent_pool or self.agent_pool
    assert agent_pool, "未指定资源池,无法申请资源"

    if fast_fail and agent_pool.available_quantity <= 0:
        return None

    try:
        req = agent_pool.do_request_resource()
        resource: Optional["IAgent"] = yield req
    except simpy.Interrupt:
        agent_pool.cancel_request_resource(req)
        resource = None

    if resource:
        agent_pool.after_resource_requested(resource=resource, **kwargs)
    return resource

GoHomeStrategy

回到归属地策略

源代码位于: logis/biz/sim/agent/iface/idle_strategy.py
class GoHomeStrategy(AgentIdleStrategy):
    """
    回到归属地策略
    """

    def __init__(
        self,
        frequency: GoHomeStrategyFrequency,
        agent: Optional[AgentClass] = None,
        env: Optional[simpy.Environment] = None,
        **kwargs,
    ) -> None:
        super().__init__(agent=agent, **kwargs)
        self.frequency: GoHomeStrategyFrequency = frequency
        self.env: simpy.Environment = env

    def on_idle(
        self,
        agent: Optional[AgentClass] = None,
        is_idle: Optional[Predicate[AgentClass]] = None,
        **kwargs,
    ):
        """
        默认空闲策略,智能体在空闲时,等待下一个任务
        """
        agent = agent or self.agent
        is_idle = is_idle or self.is_idle
        if self.frequency == "如果无其他任务":
            if is_idle(agent, **kwargs):
                yield from agent.move(target=agent.origin_location, **kwargs)
        elif self.frequency == "每次":
            yield from agent.move(target=agent.origin_location, **kwargs)
        else:
            raise NotImplementedError(f"频率{self.frequency}尚未支持")

env instance-attribute

env: Environment = env

frequency instance-attribute

frequency: GoHomeStrategyFrequency = frequency

__init__

__init__(frequency: GoHomeStrategyFrequency, agent: Optional[AgentClass] = None, env: Optional[Environment] = None, **kwargs) -> None
源代码位于: logis/biz/sim/agent/iface/idle_strategy.py
def __init__(
    self,
    frequency: GoHomeStrategyFrequency,
    agent: Optional[AgentClass] = None,
    env: Optional[simpy.Environment] = None,
    **kwargs,
) -> None:
    super().__init__(agent=agent, **kwargs)
    self.frequency: GoHomeStrategyFrequency = frequency
    self.env: simpy.Environment = env

on_idle

on_idle(agent: Optional[AgentClass] = None, is_idle: Optional[Predicate[AgentClass]] = None, **kwargs)

默认空闲策略,智能体在空闲时,等待下一个任务

源代码位于: logis/biz/sim/agent/iface/idle_strategy.py
def on_idle(
    self,
    agent: Optional[AgentClass] = None,
    is_idle: Optional[Predicate[AgentClass]] = None,
    **kwargs,
):
    """
    默认空闲策略,智能体在空闲时,等待下一个任务
    """
    agent = agent or self.agent
    is_idle = is_idle or self.is_idle
    if self.frequency == "如果无其他任务":
        if is_idle(agent, **kwargs):
            yield from agent.move(target=agent.origin_location, **kwargs)
    elif self.frequency == "每次":
        yield from agent.move(target=agent.origin_location, **kwargs)
    else:
        raise NotImplementedError(f"频率{self.frequency}尚未支持")

IAgent

智能体抽象基类

源代码位于: logis/biz/sim/agent/iface/base.py
class IAgent(IBlueprint):
    """
    智能体抽象基类
    """

    @abstractmethod
    def resolve_binding_graph(self, *args, **kwargs) -> Union["IGrid", "ISimPathGraph"]:
        """
        获取智能体绑定的地图
        """
        pass

    @abstractmethod
    def resolve_center_point(self) -> Optional[Point]:
        """
        获取智能体的中心点
        """
        pass

    @property
    @abstractmethod
    def id(self) -> AgentId:
        pass

    @property
    @abstractmethod
    def rack_as_obstacle(self) -> bool:
        """
        是否将其余货架视为障碍物
        """
        pass

    @property
    @abstractmethod
    def agent_as_obstacle(self) -> bool:
        """
        是否将其余智能体视为障碍物
        """
        pass

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.origin_location: Optional[Point] = None
        """初始位置"""
        # TODO: 这里主要是因为Stock是个特例,后续考虑分离
        self.current_location: Optional[LocationType] = None
        """当前位置"""
        self.path_finding_strategy: Optional["PathFindingAlgorithm"] = None
        """寻路策略"""

    @property
    @abstractmethod
    def is_free(self, **kwargs) -> bool:
        """
        判断是否空闲
        """
        pass

    @abstractmethod
    def stop_working(self, *args, **kwargs):
        pass

    @abstractmethod
    def start_working(self, *args, **kwargs):
        pass

    @abstractmethod
    def do_move(
        self, next_point: Point, *args, **kwargs
    ) -> Generator[simpy.Event, Any, None]:
        """
        从当前位置移动到下一个相邻的位置
        """
        pass

    @abstractmethod
    def move(
        self, target: Point, speed: Optional[Union[float, Speed]] = None, **kwargs
    ) -> Generator[simpy.Event, Any, None]:
        """
        从当前位置移动到目标位置,此方法是最顶层的移动方法,内部处理路径规划并逐步调用do_move
        TODO: 考虑将speed等参数作为agent的内部属性,而不是在这里传参
        """
        pass

agent_as_obstacle abstractmethod property

agent_as_obstacle: bool

是否将其余智能体视为障碍物

current_location instance-attribute

current_location: Optional[LocationType] = None

当前位置

id abstractmethod property

id: AgentId

is_free abstractmethod property

is_free: bool

判断是否空闲

origin_location instance-attribute

origin_location: Optional[Point] = None

初始位置

path_finding_strategy instance-attribute

path_finding_strategy: Optional[PathFindingAlgorithm] = None

寻路策略

rack_as_obstacle abstractmethod property

rack_as_obstacle: bool

是否将其余货架视为障碍物

__init__

__init__(*args, **kwargs)
源代码位于: logis/biz/sim/agent/iface/base.py
def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.origin_location: Optional[Point] = None
    """初始位置"""
    # TODO: 这里主要是因为Stock是个特例,后续考虑分离
    self.current_location: Optional[LocationType] = None
    """当前位置"""
    self.path_finding_strategy: Optional["PathFindingAlgorithm"] = None
    """寻路策略"""

do_move abstractmethod

do_move(next_point: Point, *args, **kwargs) -> Generator[simpy.Event, Any, None]

从当前位置移动到下一个相邻的位置

源代码位于: logis/biz/sim/agent/iface/base.py
@abstractmethod
def do_move(
    self, next_point: Point, *args, **kwargs
) -> Generator[simpy.Event, Any, None]:
    """
    从当前位置移动到下一个相邻的位置
    """
    pass

move abstractmethod

move(target: Point, speed: Optional[Union[float, Speed]] = None, **kwargs) -> Generator[simpy.Event, Any, None]

从当前位置移动到目标位置,此方法是最顶层的移动方法,内部处理路径规划并逐步调用do_move TODO: 考虑将speed等参数作为agent的内部属性,而不是在这里传参

源代码位于: logis/biz/sim/agent/iface/base.py
@abstractmethod
def move(
    self, target: Point, speed: Optional[Union[float, Speed]] = None, **kwargs
) -> Generator[simpy.Event, Any, None]:
    """
    从当前位置移动到目标位置,此方法是最顶层的移动方法,内部处理路径规划并逐步调用do_move
    TODO: 考虑将speed等参数作为agent的内部属性,而不是在这里传参
    """
    pass

resolve_binding_graph abstractmethod

resolve_binding_graph(*args, **kwargs) -> Union[IGrid, ISimPathGraph]

获取智能体绑定的地图

源代码位于: logis/biz/sim/agent/iface/base.py
@abstractmethod
def resolve_binding_graph(self, *args, **kwargs) -> Union["IGrid", "ISimPathGraph"]:
    """
    获取智能体绑定的地图
    """
    pass

resolve_center_point abstractmethod

resolve_center_point() -> Optional[Point]

获取智能体的中心点

源代码位于: logis/biz/sim/agent/iface/base.py
@abstractmethod
def resolve_center_point(self) -> Optional[Point]:
    """
    获取智能体的中心点
    """
    pass

start_working abstractmethod

start_working(*args, **kwargs)
源代码位于: logis/biz/sim/agent/iface/base.py
@abstractmethod
def start_working(self, *args, **kwargs):
    pass

stop_working abstractmethod

stop_working(*args, **kwargs)
源代码位于: logis/biz/sim/agent/iface/base.py
@abstractmethod
def stop_working(self, *args, **kwargs):
    pass

IAgentPool

智能体资源池,生命周期钩子如下: 1. 初始化资源池 2. 在申请资源之前 3. 申请资源 4. 在资源申请到之后 5. 在资源释放之前 6. 释放资源 7. 在资源释放之后

TODO: 继承自ITaskHandler

源代码位于: logis/biz/sim/agent/iface/pool.py
class IAgentPool(ABC):
    """
    智能体资源池,生命周期钩子如下:
    1. 初始化资源池
    2. 在申请资源之前
    3. 申请资源
    4. 在资源申请到之后
    5. 在资源释放之前
    6. 释放资源
    7. 在资源释放之后

    TODO: 继承自ITaskHandler
    """

    @property
    def get_queue(self):
        """
        获取智能体的请求队列
        """
        if self.use_simpy_store:
            return self.__store__.get_queue
        return []

    @property
    def put_queue(self):
        """
        释放智能体的请求队列
        """
        if self.use_simpy_store:
            return self.__store__.put_queue
        return []

    @property
    @abstractmethod
    def env(self) -> simpy.Environment:
        """
        仿真环境实例
        """
        pass

    def __init__(self, *args, **kwargs) -> None:
        super().__init__()
        self.capacity: Optional[int] = None
        """资源池容量"""

        self.use_simpy_store: bool = False
        """是否使用simpy.Store来模拟资源池"""
        self.__store__: Optional[simpy.Store] = None
        self.__lock__ = simpy.Resource(self.env, 1)
        # 所有资源
        self._all_resources: Dict[AgentId, IAgent] = defaultdict()
        # 使用中的资源,是所有资源的子集,用于记录当前正在使用的资源
        self._locked_resources: Dict[AgentId, IAgent] = defaultdict()

    def init_simpy_store(self):
        """
        初始化simpy资源池
        """
        self.__store__ = simpy.Store(self.env, self.capacity)

    def set_locked(self, resource: AgentClass, **kwargs):
        """
        标记资源为已被锁定

        Args:
            resource: 待标记的资源
            kwargs: 其他参数

        Returns:
            bool: 是否成功标记资源为已被锁定
        """
        self._locked_resources[resource.id] = resource

        return True

    def unset_locked(self, resource: AgentClass, **kwargs):
        """
        取消标记资源为未被锁定

        Args:
            resource: 待取消标记的资源
            kwargs: 其他参数

        Returns:
            bool: 是否成功取消标记资源为未被锁定
        """
        self._locked_resources.pop(resource.id, None)
        return True

    def is_locked(self, resource: AgentClass) -> bool:
        """
        判断资源是否已被锁定

        Args:
            resource: 待判断的资源

        Returns:
            bool: 是否已被锁定
        """
        return resource.id in self._locked_resources

    def cancel_request_resource(self, req: simpy.Event):
        """
        取消申请资源,默认仅实现了use_simpy_store模式

        Args:
            req: 待取消的申请事件
        """
        if self.use_simpy_store:
            self.__store__.get_queue.remove(req)
        else:
            raise NotImplementedError("cancel_request_resource not implemented")

    def cancel_release_resource(self, req: simpy.Event):
        """
        取消释放资源,默认仅实现了use_simpy_store模式

        Args:
            req: 待取消的释放事件
        """
        if self.use_simpy_store:
            self.__store__.put_queue.remove(req)
        else:
            raise NotImplementedError("cancel_release_resource not implemented")

    @property
    def resources(self):
        """
        获取所有的资源列表
        """
        return list(self._all_resources.values())

    @property
    def id_resource_map(self):
        """
        获取智能体id到智能体的映射
        """
        return self._all_resources

    def add_agent(self, agent: IAgent):
        """
        添加智能体到资源池

        Args:
            agent: 智能体

        Raises:
            AssertionError: 如果资源池已满,则无法继续添加智能体

        """
        if self.capacity is not None:
            assert (
                len(self._all_resources) < self.capacity
            ), f"{self}资源池已满{self.capacity},无法添加资源"
        self._all_resources[agent.id] = agent

        if self.use_simpy_store:
            self.__store__.put(agent)

    def is_type_of(self, t: Type) -> bool:
        """
        判断资源池是否是指定类型的资源

        Args:
            t: 资源类型

        Returns:
            bool: 是否是指定类型的资源
        """
        if not self.resources:
            return False
        return isinstance(self.resources[0], t)

    @property
    def available_quantity(self):
        """
        获取资源池中可用的资源数量
        """
        if self.use_simpy_store:
            lx = len(self.__store__.items) if self.__store__ else 0
            return lx
        else:
            lx = None
        ly = len(self._all_resources) - len(self._locked_resources)
        if lx is not None and lx != ly:
            logging.debug(f"两种方式计算的结果不一致,{lx}!={ly}")
        return ly

    def get_resource_by_id(self, id: AgentId):
        """
        根据智能体id获取智能体

        Args:
            id: 智能体id

        Returns:
            Optional[IAgent]: 智能体
        """
        return self._all_resources.get(id, None)

    def before_request_resource(self, **kwargs):
        """
        资源申请之前的回调
        """
        raise NotImplementedError("before_request_resource not implemented")

    def do_request_resource(
        self, **kwargs
    ) -> Generator[simpy.Event, Any, Optional[AgentClass]]:
        """
        真正地执行申请资源
        """
        if self.use_simpy_store:
            assert self.__store__, "未初始化资源池,无法申请资源"
            return self.__store__.get()
        else:
            raise NotImplementedError("do_request_resource not implemented")

    @abstractmethod
    def after_resource_requested(self, resource: AgentClass, **kwargs):
        """
        资源申请到之后的回调

        Args:
            resource: 已申请到的资源
        """
        pass

    @abstractmethod
    def request_resource(
        self,
        fast_fail: bool = False,
        **kwargs,
    ) -> Generator[simpy.Event, Any, Optional[IAgent]]:
        """
        申请资源。此方法是个简单的方法组合(可能并不通用),内部会依次调用:
        1. before_request_resource
        2. do_request_resource
        3. after_resource_requested

        如果不满足您的需求,请自定义`IAgentSelectionStrategy`策略并实现`request`方法

        Args:
            strategy: 智能体选择策略
            fast_fail: 是否快速失败,如果为True,当资源池中没有可用资源时,直接返回None
            kwargs: 其他参数

        Returns:
            智能体事件生成器
        """

        if fast_fail and self.available_quantity <= 0:
            return None

        self.before_request_resource(**kwargs)
        try:
            req = self.do_request_resource()
            resource: Optional["IAgent"] = yield req
        except simpy.Interrupt:
            self.cancel_request_resource(req)
            resource = None

        if resource:
            self.after_resource_requested(resource=resource, **kwargs)
        return resource

    def before_release_resource(self, resource: AgentClass, **kwargs):
        """
        资源释放之前的回调

        Args:
            resource: 待释放的资源
        """
        raise NotImplementedError("before_release_resource not implemented")

    def do_release_resource(self, resource: AgentClass, **kwargs):
        """
        真正地执行释放资源

        Args:
            resource: 待释放的资源
        """
        if self.use_simpy_store:
            assert self.__store__, "未初始化资源池,无法释放资源"
            return self.__store__.put(resource)
        else:
            raise NotImplementedError("do_release_resource not implemented")

    def after_resource_released(self, resource: AgentClass, **kwargs):
        """
        资源释放之后的回调

        Args:
            resource: 已释放的资源
        """
        raise NotImplementedError("after_resource_released not implemented")

    @abstractmethod
    def release_resource(self, resource: AgentClass, *args, **kwargs):
        """
        释放资源。此方法是个简单的方法组合(可能并不通用),内部会依次调用:
        1. before_release_resource
        2. do_release_resource
        3. after_resource_released

        如果不满足您的需求,请自定义`IAgentSelectionStrategy`策略并实现`release`方法

        Args:
            resource: 要释放的资源
            kwargs: 其他参数
        """

        agent_pool = self
        agent_pool.before_release_resource(resource=resource, **kwargs)
        e = agent_pool.do_release_resource(resource=resource)
        # TODO: 这里是否不等待就可以
        yield e
        agent_pool.after_resource_released(resource=resource, **kwargs)
        return True

    @abstractmethod
    def assign_resources(
        self, task_type: str, task_keys: List[T], **kwargs
    ) -> Generator[
        simpy.Event, Any, Dict[AgentId, "AgentManifest[AgentClass, TaskType]"]
    ]:
        """
        输入任务分配智能体

        Args:
            task_type: 任务类型
            task_keys: 任务key列表
            kwargs: 其他参数

        Returns:
            智能体id到智能体manifest的映射
        """

__lock__ instance-attribute

__lock__ = Resource(env, 1)

__store__ instance-attribute

__store__: Optional[Store] = None

available_quantity property

available_quantity

获取资源池中可用的资源数量

capacity instance-attribute

capacity: Optional[int] = None

资源池容量

env abstractmethod property

env: Environment

仿真环境实例

get_queue property

get_queue

获取智能体的请求队列

id_resource_map property

id_resource_map

获取智能体id到智能体的映射

put_queue property

put_queue

释放智能体的请求队列

resources property

resources

获取所有的资源列表

use_simpy_store instance-attribute

use_simpy_store: bool = False

是否使用simpy.Store来模拟资源池

__init__

__init__(*args, **kwargs) -> None
源代码位于: logis/biz/sim/agent/iface/pool.py
def __init__(self, *args, **kwargs) -> None:
    super().__init__()
    self.capacity: Optional[int] = None
    """资源池容量"""

    self.use_simpy_store: bool = False
    """是否使用simpy.Store来模拟资源池"""
    self.__store__: Optional[simpy.Store] = None
    self.__lock__ = simpy.Resource(self.env, 1)
    # 所有资源
    self._all_resources: Dict[AgentId, IAgent] = defaultdict()
    # 使用中的资源,是所有资源的子集,用于记录当前正在使用的资源
    self._locked_resources: Dict[AgentId, IAgent] = defaultdict()

add_agent

add_agent(agent: IAgent)

添加智能体到资源池

参数:

名称 类型 描述 默认
agent IAgent

智能体

必需

引发:

类型 描述
AssertionError

如果资源池已满,则无法继续添加智能体

源代码位于: logis/biz/sim/agent/iface/pool.py
def add_agent(self, agent: IAgent):
    """
    添加智能体到资源池

    Args:
        agent: 智能体

    Raises:
        AssertionError: 如果资源池已满,则无法继续添加智能体

    """
    if self.capacity is not None:
        assert (
            len(self._all_resources) < self.capacity
        ), f"{self}资源池已满{self.capacity},无法添加资源"
    self._all_resources[agent.id] = agent

    if self.use_simpy_store:
        self.__store__.put(agent)

after_resource_released

after_resource_released(resource: AgentClass, **kwargs)

资源释放之后的回调

参数:

名称 类型 描述 默认
resource AgentClass

已释放的资源

必需
源代码位于: logis/biz/sim/agent/iface/pool.py
def after_resource_released(self, resource: AgentClass, **kwargs):
    """
    资源释放之后的回调

    Args:
        resource: 已释放的资源
    """
    raise NotImplementedError("after_resource_released not implemented")

after_resource_requested abstractmethod

after_resource_requested(resource: AgentClass, **kwargs)

资源申请到之后的回调

参数:

名称 类型 描述 默认
resource AgentClass

已申请到的资源

必需
源代码位于: logis/biz/sim/agent/iface/pool.py
@abstractmethod
def after_resource_requested(self, resource: AgentClass, **kwargs):
    """
    资源申请到之后的回调

    Args:
        resource: 已申请到的资源
    """
    pass

assign_resources abstractmethod

assign_resources(task_type: str, task_keys: List[T], **kwargs) -> Generator[simpy.Event, Any, Dict[AgentId, AgentManifest[AgentClass, TaskType]]]

输入任务分配智能体

参数:

名称 类型 描述 默认
task_type str

任务类型

必需
task_keys List[T]

任务key列表

必需
kwargs

其他参数

{}

返回:

类型 描述
Dict[AgentId, AgentManifest[AgentClass, TaskType]]

智能体id到智能体manifest的映射

源代码位于: logis/biz/sim/agent/iface/pool.py
@abstractmethod
def assign_resources(
    self, task_type: str, task_keys: List[T], **kwargs
) -> Generator[
    simpy.Event, Any, Dict[AgentId, "AgentManifest[AgentClass, TaskType]"]
]:
    """
    输入任务分配智能体

    Args:
        task_type: 任务类型
        task_keys: 任务key列表
        kwargs: 其他参数

    Returns:
        智能体id到智能体manifest的映射
    """

before_release_resource

before_release_resource(resource: AgentClass, **kwargs)

资源释放之前的回调

参数:

名称 类型 描述 默认
resource AgentClass

待释放的资源

必需
源代码位于: logis/biz/sim/agent/iface/pool.py
def before_release_resource(self, resource: AgentClass, **kwargs):
    """
    资源释放之前的回调

    Args:
        resource: 待释放的资源
    """
    raise NotImplementedError("before_release_resource not implemented")

before_request_resource

before_request_resource(**kwargs)

资源申请之前的回调

源代码位于: logis/biz/sim/agent/iface/pool.py
def before_request_resource(self, **kwargs):
    """
    资源申请之前的回调
    """
    raise NotImplementedError("before_request_resource not implemented")

cancel_release_resource

cancel_release_resource(req: Event)

取消释放资源,默认仅实现了use_simpy_store模式

参数:

名称 类型 描述 默认
req Event

待取消的释放事件

必需
源代码位于: logis/biz/sim/agent/iface/pool.py
def cancel_release_resource(self, req: simpy.Event):
    """
    取消释放资源,默认仅实现了use_simpy_store模式

    Args:
        req: 待取消的释放事件
    """
    if self.use_simpy_store:
        self.__store__.put_queue.remove(req)
    else:
        raise NotImplementedError("cancel_release_resource not implemented")

cancel_request_resource

cancel_request_resource(req: Event)

取消申请资源,默认仅实现了use_simpy_store模式

参数:

名称 类型 描述 默认
req Event

待取消的申请事件

必需
源代码位于: logis/biz/sim/agent/iface/pool.py
def cancel_request_resource(self, req: simpy.Event):
    """
    取消申请资源,默认仅实现了use_simpy_store模式

    Args:
        req: 待取消的申请事件
    """
    if self.use_simpy_store:
        self.__store__.get_queue.remove(req)
    else:
        raise NotImplementedError("cancel_request_resource not implemented")

do_release_resource

do_release_resource(resource: AgentClass, **kwargs)

真正地执行释放资源

参数:

名称 类型 描述 默认
resource AgentClass

待释放的资源

必需
源代码位于: logis/biz/sim/agent/iface/pool.py
def do_release_resource(self, resource: AgentClass, **kwargs):
    """
    真正地执行释放资源

    Args:
        resource: 待释放的资源
    """
    if self.use_simpy_store:
        assert self.__store__, "未初始化资源池,无法释放资源"
        return self.__store__.put(resource)
    else:
        raise NotImplementedError("do_release_resource not implemented")

do_request_resource

do_request_resource(**kwargs) -> Generator[simpy.Event, Any, Optional[AgentClass]]

真正地执行申请资源

源代码位于: logis/biz/sim/agent/iface/pool.py
def do_request_resource(
    self, **kwargs
) -> Generator[simpy.Event, Any, Optional[AgentClass]]:
    """
    真正地执行申请资源
    """
    if self.use_simpy_store:
        assert self.__store__, "未初始化资源池,无法申请资源"
        return self.__store__.get()
    else:
        raise NotImplementedError("do_request_resource not implemented")

get_resource_by_id

get_resource_by_id(id: AgentId)

根据智能体id获取智能体

参数:

名称 类型 描述 默认
id AgentId

智能体id

必需

返回:

类型 描述

Optional[IAgent]: 智能体

源代码位于: logis/biz/sim/agent/iface/pool.py
def get_resource_by_id(self, id: AgentId):
    """
    根据智能体id获取智能体

    Args:
        id: 智能体id

    Returns:
        Optional[IAgent]: 智能体
    """
    return self._all_resources.get(id, None)

init_simpy_store

init_simpy_store()

初始化simpy资源池

源代码位于: logis/biz/sim/agent/iface/pool.py
def init_simpy_store(self):
    """
    初始化simpy资源池
    """
    self.__store__ = simpy.Store(self.env, self.capacity)

is_locked

is_locked(resource: AgentClass) -> bool

判断资源是否已被锁定

参数:

名称 类型 描述 默认
resource AgentClass

待判断的资源

必需

返回:

名称 类型 描述
bool bool

是否已被锁定

源代码位于: logis/biz/sim/agent/iface/pool.py
def is_locked(self, resource: AgentClass) -> bool:
    """
    判断资源是否已被锁定

    Args:
        resource: 待判断的资源

    Returns:
        bool: 是否已被锁定
    """
    return resource.id in self._locked_resources

is_type_of

is_type_of(t: Type) -> bool

判断资源池是否是指定类型的资源

参数:

名称 类型 描述 默认
t Type

资源类型

必需

返回:

名称 类型 描述
bool bool

是否是指定类型的资源

源代码位于: logis/biz/sim/agent/iface/pool.py
def is_type_of(self, t: Type) -> bool:
    """
    判断资源池是否是指定类型的资源

    Args:
        t: 资源类型

    Returns:
        bool: 是否是指定类型的资源
    """
    if not self.resources:
        return False
    return isinstance(self.resources[0], t)

release_resource abstractmethod

release_resource(resource: AgentClass, *args, **kwargs)

释放资源。此方法是个简单的方法组合(可能并不通用),内部会依次调用: 1. before_release_resource 2. do_release_resource 3. after_resource_released

如果不满足您的需求,请自定义IAgentSelectionStrategy策略并实现release方法

参数:

名称 类型 描述 默认
resource AgentClass

要释放的资源

必需
kwargs

其他参数

{}
源代码位于: logis/biz/sim/agent/iface/pool.py
@abstractmethod
def release_resource(self, resource: AgentClass, *args, **kwargs):
    """
    释放资源。此方法是个简单的方法组合(可能并不通用),内部会依次调用:
    1. before_release_resource
    2. do_release_resource
    3. after_resource_released

    如果不满足您的需求,请自定义`IAgentSelectionStrategy`策略并实现`release`方法

    Args:
        resource: 要释放的资源
        kwargs: 其他参数
    """

    agent_pool = self
    agent_pool.before_release_resource(resource=resource, **kwargs)
    e = agent_pool.do_release_resource(resource=resource)
    # TODO: 这里是否不等待就可以
    yield e
    agent_pool.after_resource_released(resource=resource, **kwargs)
    return True

request_resource abstractmethod

request_resource(fast_fail: bool = False, **kwargs) -> Generator[simpy.Event, Any, Optional[IAgent]]

申请资源。此方法是个简单的方法组合(可能并不通用),内部会依次调用: 1. before_request_resource 2. do_request_resource 3. after_resource_requested

如果不满足您的需求,请自定义IAgentSelectionStrategy策略并实现request方法

参数:

名称 类型 描述 默认
strategy

智能体选择策略

必需
fast_fail bool

是否快速失败,如果为True,当资源池中没有可用资源时,直接返回None

False
kwargs

其他参数

{}

返回:

类型 描述
Optional[IAgent]

智能体事件生成器

源代码位于: logis/biz/sim/agent/iface/pool.py
@abstractmethod
def request_resource(
    self,
    fast_fail: bool = False,
    **kwargs,
) -> Generator[simpy.Event, Any, Optional[IAgent]]:
    """
    申请资源。此方法是个简单的方法组合(可能并不通用),内部会依次调用:
    1. before_request_resource
    2. do_request_resource
    3. after_resource_requested

    如果不满足您的需求,请自定义`IAgentSelectionStrategy`策略并实现`request`方法

    Args:
        strategy: 智能体选择策略
        fast_fail: 是否快速失败,如果为True,当资源池中没有可用资源时,直接返回None
        kwargs: 其他参数

    Returns:
        智能体事件生成器
    """

    if fast_fail and self.available_quantity <= 0:
        return None

    self.before_request_resource(**kwargs)
    try:
        req = self.do_request_resource()
        resource: Optional["IAgent"] = yield req
    except simpy.Interrupt:
        self.cancel_request_resource(req)
        resource = None

    if resource:
        self.after_resource_requested(resource=resource, **kwargs)
    return resource

set_locked

set_locked(resource: AgentClass, **kwargs)

标记资源为已被锁定

参数:

名称 类型 描述 默认
resource AgentClass

待标记的资源

必需
kwargs

其他参数

{}

返回:

名称 类型 描述
bool

是否成功标记资源为已被锁定

源代码位于: logis/biz/sim/agent/iface/pool.py
def set_locked(self, resource: AgentClass, **kwargs):
    """
    标记资源为已被锁定

    Args:
        resource: 待标记的资源
        kwargs: 其他参数

    Returns:
        bool: 是否成功标记资源为已被锁定
    """
    self._locked_resources[resource.id] = resource

    return True

unset_locked

unset_locked(resource: AgentClass, **kwargs)

取消标记资源为未被锁定

参数:

名称 类型 描述 默认
resource AgentClass

待取消标记的资源

必需
kwargs

其他参数

{}

返回:

名称 类型 描述
bool

是否成功取消标记资源为未被锁定

源代码位于: logis/biz/sim/agent/iface/pool.py
def unset_locked(self, resource: AgentClass, **kwargs):
    """
    取消标记资源为未被锁定

    Args:
        resource: 待取消标记的资源
        kwargs: 其他参数

    Returns:
        bool: 是否成功取消标记资源为未被锁定
    """
    self._locked_resources.pop(resource.id, None)
    return True

IAgentSelectionStrategy

智能体选择策略

源代码位于: logis/biz/sim/agent/iface/agent_apply_strategy.py
class IAgentSelectionStrategy(IExpose):
    """
    智能体选择策略
    """

    def __init__(self, agent_pool: Optional[IAgentPool] = None, **kwargs) -> None:
        """
        初始化智能体选择策略

        Args:
            agent_pool: 智能体池
            kwargs: 其他参数
        """
        super().__init__(**kwargs)
        self.agent_pool = agent_pool

    @abstractmethod
    def request(
        self, agent_pool: Optional[IAgentPool] = None, fast_fail: bool = False, **kwargs
    ) -> Optional[IAgent]:
        """
        选择符合需求的智能体

        Args:
            agent_pool: 智能体池,如果不传,默认使用初始化时指定的智能体池
            fast_fail: 是否快速失败,如果为True,且当前智能体池没有可用智能体,则直接返回None
            kwargs: 其他参数

        Returns:
            Optional[IAgent]: 符合需求的智能体
        """

    def release(self, agent: IAgent, agent_pool: Optional[IAgentPool] = None, **kwargs):
        """
        释放智能体

        Args:
            agent: 智能体
            agent_pool: 智能体池,如果不传,默认使用初始化时指定的智能体池
            kwargs: 其他参数

        Returns:
            bool: 是否成功释放智能体
        """
        agent_pool = agent_pool or self.agent_pool
        assert agent_pool, "未指定资源池,无法释放资源"
        agent_pool.before_release_resource(resource=agent, **kwargs)
        e = agent_pool.do_release_resource(resource=agent)
        # TODO: 这里是否不等待就可以
        yield e
        agent_pool.after_resource_released(resource=agent, **kwargs)
        return True

agent_pool instance-attribute

agent_pool = agent_pool

__init__

__init__(agent_pool: Optional[IAgentPool] = None, **kwargs) -> None

初始化智能体选择策略

参数:

名称 类型 描述 默认
agent_pool Optional[IAgentPool]

智能体池

None
kwargs

其他参数

{}
源代码位于: logis/biz/sim/agent/iface/agent_apply_strategy.py
def __init__(self, agent_pool: Optional[IAgentPool] = None, **kwargs) -> None:
    """
    初始化智能体选择策略

    Args:
        agent_pool: 智能体池
        kwargs: 其他参数
    """
    super().__init__(**kwargs)
    self.agent_pool = agent_pool

release

release(agent: IAgent, agent_pool: Optional[IAgentPool] = None, **kwargs)

释放智能体

参数:

名称 类型 描述 默认
agent IAgent

智能体

必需
agent_pool Optional[IAgentPool]

智能体池,如果不传,默认使用初始化时指定的智能体池

None
kwargs

其他参数

{}

返回:

名称 类型 描述
bool

是否成功释放智能体

源代码位于: logis/biz/sim/agent/iface/agent_apply_strategy.py
def release(self, agent: IAgent, agent_pool: Optional[IAgentPool] = None, **kwargs):
    """
    释放智能体

    Args:
        agent: 智能体
        agent_pool: 智能体池,如果不传,默认使用初始化时指定的智能体池
        kwargs: 其他参数

    Returns:
        bool: 是否成功释放智能体
    """
    agent_pool = agent_pool or self.agent_pool
    assert agent_pool, "未指定资源池,无法释放资源"
    agent_pool.before_release_resource(resource=agent, **kwargs)
    e = agent_pool.do_release_resource(resource=agent)
    # TODO: 这里是否不等待就可以
    yield e
    agent_pool.after_resource_released(resource=agent, **kwargs)
    return True

request abstractmethod

request(agent_pool: Optional[IAgentPool] = None, fast_fail: bool = False, **kwargs) -> Optional[IAgent]

选择符合需求的智能体

参数:

名称 类型 描述 默认
agent_pool Optional[IAgentPool]

智能体池,如果不传,默认使用初始化时指定的智能体池

None
fast_fail bool

是否快速失败,如果为True,且当前智能体池没有可用智能体,则直接返回None

False
kwargs

其他参数

{}

返回:

类型 描述
Optional[IAgent]

Optional[IAgent]: 符合需求的智能体

源代码位于: logis/biz/sim/agent/iface/agent_apply_strategy.py
@abstractmethod
def request(
    self, agent_pool: Optional[IAgentPool] = None, fast_fail: bool = False, **kwargs
) -> Optional[IAgent]:
    """
    选择符合需求的智能体

    Args:
        agent_pool: 智能体池,如果不传,默认使用初始化时指定的智能体池
        fast_fail: 是否快速失败,如果为True,且当前智能体池没有可用智能体,则直接返回None
        kwargs: 其他参数

    Returns:
        Optional[IAgent]: 符合需求的智能体
    """

IExpose

用于通过SDK向外暴露参数

源代码位于: logis/biz/sim/iface/base.py
class IExpose(ABC):
    """
    用于通过SDK向外暴露参数
    """

    def __init__(self, ctx: "Context", **kwargs) -> None:
        self.ctx = ctx

ctx instance-attribute

ctx = ctx

__init__

__init__(ctx: Context, **kwargs) -> None
源代码位于: logis/biz/sim/iface/base.py
def __init__(self, ctx: "Context", **kwargs) -> None:
    self.ctx = ctx

IGrid

网格

源代码位于: logis/biz/sim/component/iface/grid.py
class IGrid(ABC):
    """
    网格
    """

    @property
    @abstractmethod
    def agents(self) -> Iterable["IAgent"]:
        """
        所有的智能体
        """
        pass

    @property
    @abstractmethod
    def racks(self) -> Iterable["IRack"]:
        """
        所有的货架
        """
        pass

    @property
    @abstractmethod
    def points(self) -> Iterable[Point]:
        """
        所有的点
        """
        pass

    @abstractmethod
    def get_point_at(self, i: int, j: int) -> Point:
        """
        根据网格坐标获取点坐标
        """
        pass

    @abstractmethod
    def get_index_of_point(self, point: Point) -> Tuple[int, int]:
        """
        根据点坐标获取网格坐标
        """
        pass

    @abstractmethod
    def get_free_points(self) -> List[Point]:
        """
        获取空闲的点
        """
        pass

agents abstractmethod property

agents: Iterable[IAgent]

所有的智能体

points abstractmethod property

points: Iterable[Point]

所有的点

racks abstractmethod property

racks: Iterable[IRack]

所有的货架

get_free_points abstractmethod

get_free_points() -> List[Point]

获取空闲的点

源代码位于: logis/biz/sim/component/iface/grid.py
@abstractmethod
def get_free_points(self) -> List[Point]:
    """
    获取空闲的点
    """
    pass

get_index_of_point abstractmethod

get_index_of_point(point: Point) -> Tuple[int, int]

根据点坐标获取网格坐标

源代码位于: logis/biz/sim/component/iface/grid.py
@abstractmethod
def get_index_of_point(self, point: Point) -> Tuple[int, int]:
    """
    根据点坐标获取网格坐标
    """
    pass

get_point_at abstractmethod

get_point_at(i: int, j: int) -> Point

根据网格坐标获取点坐标

源代码位于: logis/biz/sim/component/iface/grid.py
@abstractmethod
def get_point_at(self, i: int, j: int) -> Point:
    """
    根据网格坐标获取点坐标
    """
    pass

IPathGraph

路径关系图

源代码位于: logis/iface/graph.py
class IPathGraph(ABC, Generic[E, V]):
    """
    路径关系图
    """

    @abstractmethod
    def edges(self, *args, **kwargs) -> List[E]:
        """
        所有的边
        """
        pass

    @abstractmethod
    def nodes(self, *args, **kwargs) -> List[V]:
        """
        所有的节点
        """
        pass

edges abstractmethod

edges(*args, **kwargs) -> List[E]

所有的边

源代码位于: logis/iface/graph.py
@abstractmethod
def edges(self, *args, **kwargs) -> List[E]:
    """
    所有的边
    """
    pass

nodes abstractmethod

nodes(*args, **kwargs) -> List[V]

所有的节点

源代码位于: logis/iface/graph.py
@abstractmethod
def nodes(self, *args, **kwargs) -> List[V]:
    """
    所有的节点
    """
    pass

ISimPathGraph

路径关系图

源代码位于: logis/biz/sim/graph/iface/__init__.py
class ISimPathGraph(IPathGraph[DirEdge, Point]):
    """
    路径关系图
    """

    @property
    @abstractmethod
    def env(self) -> simpy.Environment:
        pass

    def __init__(self, *args, **kwargs) -> None:
        super().__init__()
        # TODO: 是否直接继承会更方便?
        self._graph_ = DiGraph()
        self._lock_nodes_map: Dict[str, Set[Point]] = defaultdict(lambda: set())
        self._path_lock = simpy.Resource(self.env)
        self.__index_point_map__: Dict[str, Point] = {}

    def request_path_lock(self):
        return self._path_lock.request()

    def nodes(self, *args, **kwargs) -> List[Point]:
        return list(self._graph_.nodes(*args, **kwargs))

    def edges(self, *args, **kwargs) -> List[DirEdge]:
        return list(self._graph_.edges(*args, **kwargs))

    def lock_node(self, lock_id: str, node: Point):
        self._lock_nodes_map[lock_id].add(node)

    def unlock_node(self, lock_id: str, node: Point):
        if node in self._lock_nodes_map[lock_id]:
            self._lock_nodes_map[lock_id].remove(node)

    def unlock_nodes_by_lock_id(self, lock_id: str):
        self._lock_nodes_map[lock_id].clear()

    def get_locked_by_other(self, my_lock_id: str):
        return set().union(
            *[nodes for id, nodes in self._lock_nodes_map.items() if id != my_lock_id]
        )

    def lock_path(self, lock_id: str, path: List[Point], limit: Optional[int] = None):
        """
        锁格
        """
        path = path[0:limit]
        while True:
            with self.request_path_lock() as req:
                yield req
                locked = self.get_locked_by_other(lock_id)
                if len(locked.intersection(path)) == 0:
                    # 在获取锁的情况下立即锁定路径
                    for p in path:
                        self._lock_nodes_map[lock_id].add(p)
                    break
            # 未获取锁或路径被锁定,等待后重试
            yield self.env.timeout(0.5)

    def unlock_path(self, lock_id: str, path: List[Point]):
        for p in path:
            if p in self._lock_nodes_map[lock_id]:
                self._lock_nodes_map[lock_id].remove(p)

    def add_nodes(self, *nodes: Point):
        for node in nodes:
            self._graph_.add_node(node)

    def add_edges(self, *edges: DirEdge):
        for edge in edges:
            self._graph_.add_edge(edge.starter, edge.ender, weight=edge.get_weight())

    def remove_edges(self, *edges: DirEdge):
        for edge in edges:
            self._graph_.remove_edge(edge.starter, edge.ender)

    def remove_edges_if_exists(self, *edges: DirEdge):
        for edge in edges:
            if edge.starter in self._graph_ and edge.ender in self._graph_:
                self._graph_.remove_edge(edge.starter, edge.ender)

    def get_index_point_map(self, init: bool = False, **kwargs):
        """
        获取索引点映射

        Args:
            init: 是否强制重新初始化映射,默认False

        Returns:
            Dict[str, Point]
        """
        return self.__index_point_map__

    def get_point_by_index(self, index: str):
        return self.__index_point_map__.get(index)

    def realtime_obstacles(
        self, my_lock_id: str, config: "ObstacleDetectorConfig", **kwargs
    ) -> Tuple[List[Point], List[Point]]:
        """
        实时获取障碍物

        Args:
            my_lock_id: 获取方的锁ID
            config: 障碍物检测配置
            **kwargs: 其他参数

        Returns:
            Tuple[List[Point], List[Point]]: (智能体障碍物点列表, 所有障碍物点列表)
        """
        raise NotImplementedError("realtime_obstacles not implemented")

    @abstractmethod
    def find_path(
        self,
        src: Point,
        dest: Point,
        alg: PathFindingAlgorithm,
        excluded_vertices: List[Point] = [],
        **kwargs,
    ) -> PathFindingOutput:
        """
        路径规划

        TODO: 后续把图作为参数传入算法,而不是把算法作为参数传入图

        Args:
            src: 起始点
            dest: 目的地
            alg: 路径规划算法,默认AStar算法
            excluded_vertices: 排除的顶点,默认空列表
            **kwargs: 其他参数

        Returns:
            PathFindingOutput: 路径规划结果
        """

__index_point_map__ instance-attribute

__index_point_map__: Dict[str, Point] = {}

env abstractmethod property

env: Environment

__init__

__init__(*args, **kwargs) -> None
源代码位于: logis/biz/sim/graph/iface/__init__.py
def __init__(self, *args, **kwargs) -> None:
    super().__init__()
    # TODO: 是否直接继承会更方便?
    self._graph_ = DiGraph()
    self._lock_nodes_map: Dict[str, Set[Point]] = defaultdict(lambda: set())
    self._path_lock = simpy.Resource(self.env)
    self.__index_point_map__: Dict[str, Point] = {}

add_edges

add_edges(*edges: DirEdge)
源代码位于: logis/biz/sim/graph/iface/__init__.py
def add_edges(self, *edges: DirEdge):
    for edge in edges:
        self._graph_.add_edge(edge.starter, edge.ender, weight=edge.get_weight())

add_nodes

add_nodes(*nodes: Point)
源代码位于: logis/biz/sim/graph/iface/__init__.py
def add_nodes(self, *nodes: Point):
    for node in nodes:
        self._graph_.add_node(node)

edges

edges(*args, **kwargs) -> List[DirEdge]
源代码位于: logis/biz/sim/graph/iface/__init__.py
def edges(self, *args, **kwargs) -> List[DirEdge]:
    return list(self._graph_.edges(*args, **kwargs))

find_path abstractmethod

find_path(src: Point, dest: Point, alg: PathFindingAlgorithm, excluded_vertices: List[Point] = [], **kwargs) -> PathFindingOutput

路径规划

TODO: 后续把图作为参数传入算法,而不是把算法作为参数传入图

参数:

名称 类型 描述 默认
src Point

起始点

必需
dest Point

目的地

必需
alg PathFindingAlgorithm

路径规划算法,默认AStar算法

必需
excluded_vertices List[Point]

排除的顶点,默认空列表

[]
**kwargs

其他参数

{}

返回:

名称 类型 描述
PathFindingOutput PathFindingOutput

路径规划结果

源代码位于: logis/biz/sim/graph/iface/__init__.py
@abstractmethod
def find_path(
    self,
    src: Point,
    dest: Point,
    alg: PathFindingAlgorithm,
    excluded_vertices: List[Point] = [],
    **kwargs,
) -> PathFindingOutput:
    """
    路径规划

    TODO: 后续把图作为参数传入算法,而不是把算法作为参数传入图

    Args:
        src: 起始点
        dest: 目的地
        alg: 路径规划算法,默认AStar算法
        excluded_vertices: 排除的顶点,默认空列表
        **kwargs: 其他参数

    Returns:
        PathFindingOutput: 路径规划结果
    """

get_index_point_map

get_index_point_map(init: bool = False, **kwargs)

获取索引点映射

参数:

名称 类型 描述 默认
init bool

是否强制重新初始化映射,默认False

False

返回:

类型 描述

Dict[str, Point]

源代码位于: logis/biz/sim/graph/iface/__init__.py
def get_index_point_map(self, init: bool = False, **kwargs):
    """
    获取索引点映射

    Args:
        init: 是否强制重新初始化映射,默认False

    Returns:
        Dict[str, Point]
    """
    return self.__index_point_map__

get_locked_by_other

get_locked_by_other(my_lock_id: str)
源代码位于: logis/biz/sim/graph/iface/__init__.py
def get_locked_by_other(self, my_lock_id: str):
    return set().union(
        *[nodes for id, nodes in self._lock_nodes_map.items() if id != my_lock_id]
    )

get_point_by_index

get_point_by_index(index: str)
源代码位于: logis/biz/sim/graph/iface/__init__.py
def get_point_by_index(self, index: str):
    return self.__index_point_map__.get(index)

lock_node

lock_node(lock_id: str, node: Point)
源代码位于: logis/biz/sim/graph/iface/__init__.py
def lock_node(self, lock_id: str, node: Point):
    self._lock_nodes_map[lock_id].add(node)

lock_path

lock_path(lock_id: str, path: List[Point], limit: Optional[int] = None)

锁格

源代码位于: logis/biz/sim/graph/iface/__init__.py
def lock_path(self, lock_id: str, path: List[Point], limit: Optional[int] = None):
    """
    锁格
    """
    path = path[0:limit]
    while True:
        with self.request_path_lock() as req:
            yield req
            locked = self.get_locked_by_other(lock_id)
            if len(locked.intersection(path)) == 0:
                # 在获取锁的情况下立即锁定路径
                for p in path:
                    self._lock_nodes_map[lock_id].add(p)
                break
        # 未获取锁或路径被锁定,等待后重试
        yield self.env.timeout(0.5)

nodes

nodes(*args, **kwargs) -> List[Point]
源代码位于: logis/biz/sim/graph/iface/__init__.py
def nodes(self, *args, **kwargs) -> List[Point]:
    return list(self._graph_.nodes(*args, **kwargs))

realtime_obstacles

realtime_obstacles(my_lock_id: str, config: ObstacleDetectorConfig, **kwargs) -> Tuple[List[Point], List[Point]]

实时获取障碍物

参数:

名称 类型 描述 默认
my_lock_id str

获取方的锁ID

必需
config ObstacleDetectorConfig

障碍物检测配置

必需
**kwargs

其他参数

{}

返回:

类型 描述
Tuple[List[Point], List[Point]]

Tuple[List[Point], List[Point]]: (智能体障碍物点列表, 所有障碍物点列表)

源代码位于: logis/biz/sim/graph/iface/__init__.py
def realtime_obstacles(
    self, my_lock_id: str, config: "ObstacleDetectorConfig", **kwargs
) -> Tuple[List[Point], List[Point]]:
    """
    实时获取障碍物

    Args:
        my_lock_id: 获取方的锁ID
        config: 障碍物检测配置
        **kwargs: 其他参数

    Returns:
        Tuple[List[Point], List[Point]]: (智能体障碍物点列表, 所有障碍物点列表)
    """
    raise NotImplementedError("realtime_obstacles not implemented")

remove_edges

remove_edges(*edges: DirEdge)
源代码位于: logis/biz/sim/graph/iface/__init__.py
def remove_edges(self, *edges: DirEdge):
    for edge in edges:
        self._graph_.remove_edge(edge.starter, edge.ender)

remove_edges_if_exists

remove_edges_if_exists(*edges: DirEdge)
源代码位于: logis/biz/sim/graph/iface/__init__.py
def remove_edges_if_exists(self, *edges: DirEdge):
    for edge in edges:
        if edge.starter in self._graph_ and edge.ender in self._graph_:
            self._graph_.remove_edge(edge.starter, edge.ender)

request_path_lock

request_path_lock()
源代码位于: logis/biz/sim/graph/iface/__init__.py
def request_path_lock(self):
    return self._path_lock.request()

unlock_node

unlock_node(lock_id: str, node: Point)
源代码位于: logis/biz/sim/graph/iface/__init__.py
def unlock_node(self, lock_id: str, node: Point):
    if node in self._lock_nodes_map[lock_id]:
        self._lock_nodes_map[lock_id].remove(node)

unlock_nodes_by_lock_id

unlock_nodes_by_lock_id(lock_id: str)
源代码位于: logis/biz/sim/graph/iface/__init__.py
def unlock_nodes_by_lock_id(self, lock_id: str):
    self._lock_nodes_map[lock_id].clear()

unlock_path

unlock_path(lock_id: str, path: List[Point])
源代码位于: logis/biz/sim/graph/iface/__init__.py
def unlock_path(self, lock_id: str, path: List[Point]):
    for p in path:
        if p in self._lock_nodes_map[lock_id]:
            self._lock_nodes_map[lock_id].remove(p)

Point

此类不通用,历史遗留,不建议使用,如有需要建议使用GenericPoint 支持x,y,z三维坐标点,也可当作二维坐标使用 TODO: 处理单位

源代码位于: logis/data_type/point.py
class Point(GenericPoint[float]):
    """
    此类不通用,历史遗留,不建议使用,如有需要建议使用GenericPoint
    支持x,y,z三维坐标点,也可当作二维坐标使用
    TODO: 处理单位
    """

    __counter = count(0, step=1)
    __global_precision__ = 3

    @classmethod
    def try_parse(cls, dc: dict, **kwargs) -> "Point":
        """
        尝试读取X、Y、Z坐标,返回一个Point对象
        """
        if not isinstance(dc, dict):
            raise ValueError("Expected a dictionary for Point parsing.")
        xyz = [dc.get(k, None) for k in ("X", "Y", "Z")]
        return Point(xyz, **kwargs)

    @classmethod
    def from_tuple(cls, args: Tuple[Number], **kwargs):
        """
        从元组创建点
        """
        return cls(args, **kwargs)

    @classmethod
    def of(
        cls,
        x: Optional[Number] = None,
        y: Optional[Number] = None,
        z: Optional[Number] = None,
        **kwargs,
    ) -> "Point":
        p = Point(x, y, z, **kwargs)
        return p

    def __init__(self, *args, precision: Optional[int] = None, **kwargs):
        """
        支持[x,y]、x,y、[x,y,z]、x,y,z传参赋值
        默认x=y=z=0

        TODO: 不应该自动转float、不应该自动保留两位小数
        """
        self._id = next(self.__counter)
        self.unit: Optional[str] = kwargs.get("unit", None)

        x: Optional[Number] = kwargs.get("x", None)
        y: Optional[Number] = kwargs.get("y", None)
        z: Optional[Number] = kwargs.get("z", None)
        if len(args) == 1:
            assert isinstance(
                args[0], (list, tuple)
            ), "single argument must be a list or tuple."
            x = cast_if_not_none(args[0][0], float)
            y = cast_if_not_none(args[0][1], float)
            if len(args[0]) > 2 and (z := args[0][2]) is not None:
                z = cast_if_not_none(z, float)

        elif len(args) >= 2:
            x = cast_if_not_none(args[0], float)
            y = cast_if_not_none(args[1], float)
            if len(args) > 2 and (z := args[2]) is not None:
                z = cast_if_not_none(z, float)

        # 调试过程中发现z有时候是None有时候是0,因此这里统一给了默认值
        # 实际上应该外层规范
        x = x if x is not None else 0
        y = y if y is not None else 0
        z = z if z is not None else 0

        kwargs.update(x=x, y=y, z=z)

        super().__init__(precision=precision, **kwargs)

    def __repr__(self):
        return f"Point({self.x}, {self.y}, {self.z})"

    def __hash__(self):

        def s(v: Optional[Number]):
            if not self.precision:
                return v
            if v is None:
                return v
            return v
            # return str(Decimal(v).quantize(Decimal(f"0.{'0'*self.precision}")))

        # if self._id is not None:
        #     return hash((self.__class__.__name__, self._id))
        return hash((s(self.x), s(self.y), s(self.z), self.precision, self.unit))

    def __lt__(self, other):
        """
        FIXME: 考虑z
        """
        if isinstance(other, Point):
            # TODO: 只要任何一个大于就行?
            return (self.x, self.y) < (other.x, other.y)
        return NotImplemented

    def __sub__(self, other: "Point"):
        def sub(a: Number, b: Number):
            if a is None and b is None:
                return None
            a = a or 0
            b = b or 0
            return a - b

        assert self.unit == other.unit, "unit must be same"
        return Point.of(
            x=sub(self.x, other.x),
            y=sub(self.y, other.y),
            z=sub(self.z, other.z),
            unit=self.unit,
        )

    def to_tuple(self) -> TuplePoint:
        return (
            self.x,
            self.y,
            self.z,
        )

    def __str__(self):
        return f"Point(id={self._id},x={self.x},y={self.y},z={self.z})"

__counter class-attribute instance-attribute

__counter = count(0, step=1)

__global_precision__ class-attribute instance-attribute

__global_precision__ = 3

unit instance-attribute

unit: Optional[str] = get('unit', None)

__hash__

__hash__()
源代码位于: logis/data_type/point.py
def __hash__(self):

    def s(v: Optional[Number]):
        if not self.precision:
            return v
        if v is None:
            return v
        return v
        # return str(Decimal(v).quantize(Decimal(f"0.{'0'*self.precision}")))

    # if self._id is not None:
    #     return hash((self.__class__.__name__, self._id))
    return hash((s(self.x), s(self.y), s(self.z), self.precision, self.unit))

__init__

__init__(*args, precision: Optional[int] = None, **kwargs)

支持[x,y]、x,y、[x,y,z]、x,y,z传参赋值 默认x=y=z=0

TODO: 不应该自动转float、不应该自动保留两位小数

源代码位于: logis/data_type/point.py
def __init__(self, *args, precision: Optional[int] = None, **kwargs):
    """
    支持[x,y]、x,y、[x,y,z]、x,y,z传参赋值
    默认x=y=z=0

    TODO: 不应该自动转float、不应该自动保留两位小数
    """
    self._id = next(self.__counter)
    self.unit: Optional[str] = kwargs.get("unit", None)

    x: Optional[Number] = kwargs.get("x", None)
    y: Optional[Number] = kwargs.get("y", None)
    z: Optional[Number] = kwargs.get("z", None)
    if len(args) == 1:
        assert isinstance(
            args[0], (list, tuple)
        ), "single argument must be a list or tuple."
        x = cast_if_not_none(args[0][0], float)
        y = cast_if_not_none(args[0][1], float)
        if len(args[0]) > 2 and (z := args[0][2]) is not None:
            z = cast_if_not_none(z, float)

    elif len(args) >= 2:
        x = cast_if_not_none(args[0], float)
        y = cast_if_not_none(args[1], float)
        if len(args) > 2 and (z := args[2]) is not None:
            z = cast_if_not_none(z, float)

    # 调试过程中发现z有时候是None有时候是0,因此这里统一给了默认值
    # 实际上应该外层规范
    x = x if x is not None else 0
    y = y if y is not None else 0
    z = z if z is not None else 0

    kwargs.update(x=x, y=y, z=z)

    super().__init__(precision=precision, **kwargs)

__lt__

__lt__(other)

FIXME: 考虑z

源代码位于: logis/data_type/point.py
def __lt__(self, other):
    """
    FIXME: 考虑z
    """
    if isinstance(other, Point):
        # TODO: 只要任何一个大于就行?
        return (self.x, self.y) < (other.x, other.y)
    return NotImplemented

__repr__

__repr__()
源代码位于: logis/data_type/point.py
def __repr__(self):
    return f"Point({self.x}, {self.y}, {self.z})"

__str__

__str__()
源代码位于: logis/data_type/point.py
def __str__(self):
    return f"Point(id={self._id},x={self.x},y={self.y},z={self.z})"

__sub__

__sub__(other: Point)
源代码位于: logis/data_type/point.py
def __sub__(self, other: "Point"):
    def sub(a: Number, b: Number):
        if a is None and b is None:
            return None
        a = a or 0
        b = b or 0
        return a - b

    assert self.unit == other.unit, "unit must be same"
    return Point.of(
        x=sub(self.x, other.x),
        y=sub(self.y, other.y),
        z=sub(self.z, other.z),
        unit=self.unit,
    )

from_tuple classmethod

from_tuple(args: Tuple[Number], **kwargs)

从元组创建点

源代码位于: logis/data_type/point.py
@classmethod
def from_tuple(cls, args: Tuple[Number], **kwargs):
    """
    从元组创建点
    """
    return cls(args, **kwargs)

of classmethod

of(x: Optional[Number] = None, y: Optional[Number] = None, z: Optional[Number] = None, **kwargs) -> Point
源代码位于: logis/data_type/point.py
@classmethod
def of(
    cls,
    x: Optional[Number] = None,
    y: Optional[Number] = None,
    z: Optional[Number] = None,
    **kwargs,
) -> "Point":
    p = Point(x, y, z, **kwargs)
    return p

to_tuple

to_tuple() -> TuplePoint
源代码位于: logis/data_type/point.py
def to_tuple(self) -> TuplePoint:
    return (
        self.x,
        self.y,
        self.z,
    )

try_parse classmethod

try_parse(dc: dict, **kwargs) -> Point

尝试读取X、Y、Z坐标,返回一个Point对象

源代码位于: logis/data_type/point.py
@classmethod
def try_parse(cls, dc: dict, **kwargs) -> "Point":
    """
    尝试读取X、Y、Z坐标,返回一个Point对象
    """
    if not isinstance(dc, dict):
        raise ValueError("Expected a dictionary for Point parsing.")
    xyz = [dc.get(k, None) for k in ("X", "Y", "Z")]
    return Point(xyz, **kwargs)

Speed

源代码位于: logis/data_type/unitable.py
class Speed(QuantifiedValue):
    pass