跳转至

搬运

AgentIdleStrategyOption module-attribute

AgentIdleStrategyOption: TypeAlias = Literal['返回到归属地位置', '停留在原地', '自定义']

AgentSelectionStrategyName module-attribute

AgentSelectionStrategyName: TypeAlias = Literal['距离近优先', '空闲优先', '自定义']

GoHomeStrategyFrequency module-attribute

GoHomeStrategyFrequency: TypeAlias = Literal['每次', '如果无其他任务']

AgentIdleStrategy

智能体空闲策略

源代码位于: logis/biz/sim/agent/iface/idle_strategy.py
class AgentIdleStrategy(IExpose):
    """
    智能体空闲策略
    """

    def __init__(self, agent: Optional[AgentClass] = None, **kwargs) -> None:
        """
        初始化智能体空闲策略

        Args:
            agent: 智能体,是否要传取决于此策略是否仅针对此智能体
        """
        super().__init__(**kwargs)
        self.agent = agent

    def is_idle(self, agent: Optional[AgentClass] = None, **kwargs) -> bool:
        """
        判断智能体是否空闲

        Args:
            agent: 如果不传则使用初始化时传的智能体,否则使用传参的智能体
        """
        raise NotImplementedError("is_idle方法尚未实现")

    @abstractmethod
    def on_idle(
        self,
        agent: Optional[AgentClass] = None,
        is_idle: Optional[Predicate[AgentClass]] = None,
        **kwargs,
    ) -> Generator[simpy.Event, Any, None]:
        """
        智能体做完任务之后的回调逻辑

        Args:
            agent: 如果不传则使用初始化时传的智能体,否则使用传参的智能体
            is_idle: 判断智能体是否空闲的方法,如果不传则使用is_idle方法,否则使用传参的方法判断
        """

agent instance-attribute

agent = agent

__init__

__init__(agent: Optional[AgentClass] = None, **kwargs) -> None

初始化智能体空闲策略

参数:

名称 类型 描述 默认
agent Optional[AgentClass]

智能体,是否要传取决于此策略是否仅针对此智能体

None
源代码位于: logis/biz/sim/agent/iface/idle_strategy.py
def __init__(self, agent: Optional[AgentClass] = None, **kwargs) -> None:
    """
    初始化智能体空闲策略

    Args:
        agent: 智能体,是否要传取决于此策略是否仅针对此智能体
    """
    super().__init__(**kwargs)
    self.agent = agent

is_idle

is_idle(agent: Optional[AgentClass] = None, **kwargs) -> bool

判断智能体是否空闲

参数:

名称 类型 描述 默认
agent Optional[AgentClass]

如果不传则使用初始化时传的智能体,否则使用传参的智能体

None
源代码位于: logis/biz/sim/agent/iface/idle_strategy.py
def is_idle(self, agent: Optional[AgentClass] = None, **kwargs) -> bool:
    """
    判断智能体是否空闲

    Args:
        agent: 如果不传则使用初始化时传的智能体,否则使用传参的智能体
    """
    raise NotImplementedError("is_idle方法尚未实现")

on_idle abstractmethod

on_idle(agent: Optional[AgentClass] = None, is_idle: Optional[Predicate[AgentClass]] = None, **kwargs) -> Generator[simpy.Event, Any, None]

智能体做完任务之后的回调逻辑

参数:

名称 类型 描述 默认
agent Optional[AgentClass]

如果不传则使用初始化时传的智能体,否则使用传参的智能体

None
is_idle Optional[Predicate[AgentClass]]

判断智能体是否空闲的方法,如果不传则使用is_idle方法,否则使用传参的方法判断

None
源代码位于: logis/biz/sim/agent/iface/idle_strategy.py
@abstractmethod
def on_idle(
    self,
    agent: Optional[AgentClass] = None,
    is_idle: Optional[Predicate[AgentClass]] = None,
    **kwargs,
) -> Generator[simpy.Event, Any, None]:
    """
    智能体做完任务之后的回调逻辑

    Args:
        agent: 如果不传则使用初始化时传的智能体,否则使用传参的智能体
        is_idle: 判断智能体是否空闲的方法,如果不传则使用is_idle方法,否则使用传参的方法判断
    """

AutonomousMovable

自主移动

源代码位于: logis/biz/sim/transport/__init__.py
class AutonomousMovable(metaclass=ABCMeta):
    """
    自主移动
    """

Entity

entity模块,用来存储交互文件中结点(事件、资源池、货架组等)以及所有空间标记的信息

TODO:类型标注、泛型

源代码位于: logis/biz/sim/component/model/__init__.py
class ComponentForm:
    """
    entity模块,用来存储交互文件中结点(事件、资源池、货架组等)以及所有空间标记的信息

    TODO:类型标注、泛型
    """

    def __init__(self, entity_data: Dict[str, Any], **kwargs):
        self.type: str = entity_data.get("Type")
        # 实际上是类型id
        self.type_id = self.id = str(entity_data.get("ID"))
        self.type_name: str = entity_data.get("Name", "Unknown")
        self.create_edit_id: str = str(entity_data.get("Create_Edit_ID", ""))
        # TODO:这个字段看起来没什么用,考虑去除
        self.agent_id: TmpId = entity_data.get("AgentID")
        # TODO:这个字段还没看到在哪用的
        self.link_ids: Dict[str, Any] = entity_data.get("LinkIDs", {})
        self.extends_dict: Dict[str, Any] = {}
        self.properties: Dict[str, Any] = {}
        self.disabled: bool = entity_data.get("disabled", False)

        if not self.create_edit_id:
            logging.warning(
                f"Warning: Entity {self.type_name} is missing 'Create_Edit_ID'"
            )

    @property
    def name(self) -> Optional[str]:
        return self.properties.get("名称")

    @property
    def display_name(self) -> Optional[str]:
        return self.properties.get("显示名称")

agent_id instance-attribute

agent_id: TmpId = get('AgentID')

create_edit_id instance-attribute

create_edit_id: str = str(get('Create_Edit_ID', ''))

disabled instance-attribute

disabled: bool = get('disabled', False)

display_name property

display_name: Optional[str]

extends_dict instance-attribute

extends_dict: Dict[str, Any] = {}

id instance-attribute

id = str(get('ID'))
link_ids: Dict[str, Any] = get('LinkIDs', {})

name property

name: Optional[str]

properties instance-attribute

properties: Dict[str, Any] = {}

type instance-attribute

type: str = get('Type')

type_id instance-attribute

type_id = str(get('ID'))

type_name instance-attribute

type_name: str = get('Name', 'Unknown')

__init__

__init__(entity_data: Dict[str, Any], **kwargs)
源代码位于: logis/biz/sim/component/model/__init__.py
def __init__(self, entity_data: Dict[str, Any], **kwargs):
    self.type: str = entity_data.get("Type")
    # 实际上是类型id
    self.type_id = self.id = str(entity_data.get("ID"))
    self.type_name: str = entity_data.get("Name", "Unknown")
    self.create_edit_id: str = str(entity_data.get("Create_Edit_ID", ""))
    # TODO:这个字段看起来没什么用,考虑去除
    self.agent_id: TmpId = entity_data.get("AgentID")
    # TODO:这个字段还没看到在哪用的
    self.link_ids: Dict[str, Any] = entity_data.get("LinkIDs", {})
    self.extends_dict: Dict[str, Any] = {}
    self.properties: Dict[str, Any] = {}
    self.disabled: bool = entity_data.get("disabled", False)

    if not self.create_edit_id:
        logging.warning(
            f"Warning: Entity {self.type_name} is missing 'Create_Edit_ID'"
        )

IAgentPool

智能体资源池,生命周期钩子如下: 1. 初始化资源池 2. 在申请资源之前 3. 申请资源 4. 在资源申请到之后 5. 在资源释放之前 6. 释放资源 7. 在资源释放之后

TODO: 继承自ITaskHandler

源代码位于: logis/biz/sim/agent/iface/pool.py
class IAgentPool(ABC):
    """
    智能体资源池,生命周期钩子如下:
    1. 初始化资源池
    2. 在申请资源之前
    3. 申请资源
    4. 在资源申请到之后
    5. 在资源释放之前
    6. 释放资源
    7. 在资源释放之后

    TODO: 继承自ITaskHandler
    """

    @property
    def get_queue(self):
        """
        获取智能体的请求队列
        """
        if self.use_simpy_store:
            return self.__store__.get_queue
        return []

    @property
    def put_queue(self):
        """
        释放智能体的请求队列
        """
        if self.use_simpy_store:
            return self.__store__.put_queue
        return []

    @property
    @abstractmethod
    def env(self) -> simpy.Environment:
        """
        仿真环境实例
        """
        pass

    def __init__(self, *args, **kwargs) -> None:
        super().__init__()
        self.capacity: Optional[int] = None
        """资源池容量"""

        self.use_simpy_store: bool = False
        """是否使用simpy.Store来模拟资源池"""
        self.__store__: Optional[simpy.Store] = None
        self.__lock__ = simpy.Resource(self.env, 1)
        # 所有资源
        self._all_resources: Dict[AgentId, IAgent] = defaultdict()
        # 使用中的资源,是所有资源的子集,用于记录当前正在使用的资源
        self._locked_resources: Dict[AgentId, IAgent] = defaultdict()

    def init_simpy_store(self):
        """
        初始化simpy资源池
        """
        self.__store__ = simpy.Store(self.env, self.capacity)

    def set_locked(self, resource: AgentClass, **kwargs):
        """
        标记资源为已被锁定

        Args:
            resource: 待标记的资源
            kwargs: 其他参数

        Returns:
            bool: 是否成功标记资源为已被锁定
        """
        self._locked_resources[resource.id] = resource

        return True

    def unset_locked(self, resource: AgentClass, **kwargs):
        """
        取消标记资源为未被锁定

        Args:
            resource: 待取消标记的资源
            kwargs: 其他参数

        Returns:
            bool: 是否成功取消标记资源为未被锁定
        """
        self._locked_resources.pop(resource.id, None)
        return True

    def is_locked(self, resource: AgentClass) -> bool:
        """
        判断资源是否已被锁定

        Args:
            resource: 待判断的资源

        Returns:
            bool: 是否已被锁定
        """
        return resource.id in self._locked_resources

    def cancel_request_resource(self, req: simpy.Event):
        """
        取消申请资源,默认仅实现了use_simpy_store模式

        Args:
            req: 待取消的申请事件
        """
        if self.use_simpy_store:
            self.__store__.get_queue.remove(req)
        else:
            raise NotImplementedError("cancel_request_resource not implemented")

    def cancel_release_resource(self, req: simpy.Event):
        """
        取消释放资源,默认仅实现了use_simpy_store模式

        Args:
            req: 待取消的释放事件
        """
        if self.use_simpy_store:
            self.__store__.put_queue.remove(req)
        else:
            raise NotImplementedError("cancel_release_resource not implemented")

    @property
    def resources(self):
        """
        获取所有的资源列表
        """
        return list(self._all_resources.values())

    @property
    def id_resource_map(self):
        """
        获取智能体id到智能体的映射
        """
        return self._all_resources

    def add_agent(self, agent: IAgent):
        """
        添加智能体到资源池

        Args:
            agent: 智能体

        Raises:
            AssertionError: 如果资源池已满,则无法继续添加智能体

        """
        if self.capacity is not None:
            assert (
                len(self._all_resources) < self.capacity
            ), f"{self}资源池已满{self.capacity},无法添加资源"
        self._all_resources[agent.id] = agent

        if self.use_simpy_store:
            self.__store__.put(agent)

    def is_type_of(self, t: Type) -> bool:
        """
        判断资源池是否是指定类型的资源

        Args:
            t: 资源类型

        Returns:
            bool: 是否是指定类型的资源
        """
        if not self.resources:
            return False
        return isinstance(self.resources[0], t)

    @property
    def available_quantity(self):
        """
        获取资源池中可用的资源数量
        """
        if self.use_simpy_store:
            lx = len(self.__store__.items) if self.__store__ else 0
            return lx
        else:
            lx = None
        ly = len(self._all_resources) - len(self._locked_resources)
        if lx is not None and lx != ly:
            logging.debug(f"两种方式计算的结果不一致,{lx}!={ly}")
        return ly

    def get_resource_by_id(self, id: AgentId):
        """
        根据智能体id获取智能体

        Args:
            id: 智能体id

        Returns:
            Optional[IAgent]: 智能体
        """
        return self._all_resources.get(id, None)

    def before_request_resource(self, **kwargs):
        """
        资源申请之前的回调
        """
        raise NotImplementedError("before_request_resource not implemented")

    def do_request_resource(
        self, **kwargs
    ) -> Generator[simpy.Event, Any, Optional[AgentClass]]:
        """
        真正地执行申请资源
        """
        if self.use_simpy_store:
            assert self.__store__, "未初始化资源池,无法申请资源"
            return self.__store__.get()
        else:
            raise NotImplementedError("do_request_resource not implemented")

    @abstractmethod
    def after_resource_requested(self, resource: AgentClass, **kwargs):
        """
        资源申请到之后的回调

        Args:
            resource: 已申请到的资源
        """
        pass

    @abstractmethod
    def request_resource(
        self,
        fast_fail: bool = False,
        **kwargs,
    ) -> Generator[simpy.Event, Any, Optional[IAgent]]:
        """
        申请资源。此方法是个简单的方法组合(可能并不通用),内部会依次调用:
        1. before_request_resource
        2. do_request_resource
        3. after_resource_requested

        如果不满足您的需求,请自定义`IAgentSelectionStrategy`策略并实现`request`方法

        Args:
            strategy: 智能体选择策略
            fast_fail: 是否快速失败,如果为True,当资源池中没有可用资源时,直接返回None
            kwargs: 其他参数

        Returns:
            智能体事件生成器
        """

        if fast_fail and self.available_quantity <= 0:
            return None

        self.before_request_resource(**kwargs)
        try:
            req = self.do_request_resource()
            resource: Optional["IAgent"] = yield req
        except simpy.Interrupt:
            self.cancel_request_resource(req)
            resource = None

        if resource:
            self.after_resource_requested(resource=resource, **kwargs)
        return resource

    def before_release_resource(self, resource: AgentClass, **kwargs):
        """
        资源释放之前的回调

        Args:
            resource: 待释放的资源
        """
        raise NotImplementedError("before_release_resource not implemented")

    def do_release_resource(self, resource: AgentClass, **kwargs):
        """
        真正地执行释放资源

        Args:
            resource: 待释放的资源
        """
        if self.use_simpy_store:
            assert self.__store__, "未初始化资源池,无法释放资源"
            return self.__store__.put(resource)
        else:
            raise NotImplementedError("do_release_resource not implemented")

    def after_resource_released(self, resource: AgentClass, **kwargs):
        """
        资源释放之后的回调

        Args:
            resource: 已释放的资源
        """
        raise NotImplementedError("after_resource_released not implemented")

    @abstractmethod
    def release_resource(self, resource: AgentClass, *args, **kwargs):
        """
        释放资源。此方法是个简单的方法组合(可能并不通用),内部会依次调用:
        1. before_release_resource
        2. do_release_resource
        3. after_resource_released

        如果不满足您的需求,请自定义`IAgentSelectionStrategy`策略并实现`release`方法

        Args:
            resource: 要释放的资源
            kwargs: 其他参数
        """

        agent_pool = self
        agent_pool.before_release_resource(resource=resource, **kwargs)
        e = agent_pool.do_release_resource(resource=resource)
        # TODO: 这里是否不等待就可以
        yield e
        agent_pool.after_resource_released(resource=resource, **kwargs)
        return True

    @abstractmethod
    def assign_resources(
        self, task_type: str, task_keys: List[T], **kwargs
    ) -> Generator[
        simpy.Event, Any, Dict[AgentId, "AgentManifest[AgentClass, TaskType]"]
    ]:
        """
        输入任务分配智能体

        Args:
            task_type: 任务类型
            task_keys: 任务key列表
            kwargs: 其他参数

        Returns:
            智能体id到智能体manifest的映射
        """

__lock__ instance-attribute

__lock__ = Resource(env, 1)

__store__ instance-attribute

__store__: Optional[Store] = None

available_quantity property

available_quantity

获取资源池中可用的资源数量

capacity instance-attribute

capacity: Optional[int] = None

资源池容量

env abstractmethod property

env: Environment

仿真环境实例

get_queue property

get_queue

获取智能体的请求队列

id_resource_map property

id_resource_map

获取智能体id到智能体的映射

put_queue property

put_queue

释放智能体的请求队列

resources property

resources

获取所有的资源列表

use_simpy_store instance-attribute

use_simpy_store: bool = False

是否使用simpy.Store来模拟资源池

__init__

__init__(*args, **kwargs) -> None
源代码位于: logis/biz/sim/agent/iface/pool.py
def __init__(self, *args, **kwargs) -> None:
    super().__init__()
    self.capacity: Optional[int] = None
    """资源池容量"""

    self.use_simpy_store: bool = False
    """是否使用simpy.Store来模拟资源池"""
    self.__store__: Optional[simpy.Store] = None
    self.__lock__ = simpy.Resource(self.env, 1)
    # 所有资源
    self._all_resources: Dict[AgentId, IAgent] = defaultdict()
    # 使用中的资源,是所有资源的子集,用于记录当前正在使用的资源
    self._locked_resources: Dict[AgentId, IAgent] = defaultdict()

add_agent

add_agent(agent: IAgent)

添加智能体到资源池

参数:

名称 类型 描述 默认
agent IAgent

智能体

必需

引发:

类型 描述
AssertionError

如果资源池已满,则无法继续添加智能体

源代码位于: logis/biz/sim/agent/iface/pool.py
def add_agent(self, agent: IAgent):
    """
    添加智能体到资源池

    Args:
        agent: 智能体

    Raises:
        AssertionError: 如果资源池已满,则无法继续添加智能体

    """
    if self.capacity is not None:
        assert (
            len(self._all_resources) < self.capacity
        ), f"{self}资源池已满{self.capacity},无法添加资源"
    self._all_resources[agent.id] = agent

    if self.use_simpy_store:
        self.__store__.put(agent)

after_resource_released

after_resource_released(resource: AgentClass, **kwargs)

资源释放之后的回调

参数:

名称 类型 描述 默认
resource AgentClass

已释放的资源

必需
源代码位于: logis/biz/sim/agent/iface/pool.py
def after_resource_released(self, resource: AgentClass, **kwargs):
    """
    资源释放之后的回调

    Args:
        resource: 已释放的资源
    """
    raise NotImplementedError("after_resource_released not implemented")

after_resource_requested abstractmethod

after_resource_requested(resource: AgentClass, **kwargs)

资源申请到之后的回调

参数:

名称 类型 描述 默认
resource AgentClass

已申请到的资源

必需
源代码位于: logis/biz/sim/agent/iface/pool.py
@abstractmethod
def after_resource_requested(self, resource: AgentClass, **kwargs):
    """
    资源申请到之后的回调

    Args:
        resource: 已申请到的资源
    """
    pass

assign_resources abstractmethod

assign_resources(task_type: str, task_keys: List[T], **kwargs) -> Generator[simpy.Event, Any, Dict[AgentId, AgentManifest[AgentClass, TaskType]]]

输入任务分配智能体

参数:

名称 类型 描述 默认
task_type str

任务类型

必需
task_keys List[T]

任务key列表

必需
kwargs

其他参数

{}

返回:

类型 描述
Dict[AgentId, AgentManifest[AgentClass, TaskType]]

智能体id到智能体manifest的映射

源代码位于: logis/biz/sim/agent/iface/pool.py
@abstractmethod
def assign_resources(
    self, task_type: str, task_keys: List[T], **kwargs
) -> Generator[
    simpy.Event, Any, Dict[AgentId, "AgentManifest[AgentClass, TaskType]"]
]:
    """
    输入任务分配智能体

    Args:
        task_type: 任务类型
        task_keys: 任务key列表
        kwargs: 其他参数

    Returns:
        智能体id到智能体manifest的映射
    """

before_release_resource

before_release_resource(resource: AgentClass, **kwargs)

资源释放之前的回调

参数:

名称 类型 描述 默认
resource AgentClass

待释放的资源

必需
源代码位于: logis/biz/sim/agent/iface/pool.py
def before_release_resource(self, resource: AgentClass, **kwargs):
    """
    资源释放之前的回调

    Args:
        resource: 待释放的资源
    """
    raise NotImplementedError("before_release_resource not implemented")

before_request_resource

before_request_resource(**kwargs)

资源申请之前的回调

源代码位于: logis/biz/sim/agent/iface/pool.py
def before_request_resource(self, **kwargs):
    """
    资源申请之前的回调
    """
    raise NotImplementedError("before_request_resource not implemented")

cancel_release_resource

cancel_release_resource(req: Event)

取消释放资源,默认仅实现了use_simpy_store模式

参数:

名称 类型 描述 默认
req Event

待取消的释放事件

必需
源代码位于: logis/biz/sim/agent/iface/pool.py
def cancel_release_resource(self, req: simpy.Event):
    """
    取消释放资源,默认仅实现了use_simpy_store模式

    Args:
        req: 待取消的释放事件
    """
    if self.use_simpy_store:
        self.__store__.put_queue.remove(req)
    else:
        raise NotImplementedError("cancel_release_resource not implemented")

cancel_request_resource

cancel_request_resource(req: Event)

取消申请资源,默认仅实现了use_simpy_store模式

参数:

名称 类型 描述 默认
req Event

待取消的申请事件

必需
源代码位于: logis/biz/sim/agent/iface/pool.py
def cancel_request_resource(self, req: simpy.Event):
    """
    取消申请资源,默认仅实现了use_simpy_store模式

    Args:
        req: 待取消的申请事件
    """
    if self.use_simpy_store:
        self.__store__.get_queue.remove(req)
    else:
        raise NotImplementedError("cancel_request_resource not implemented")

do_release_resource

do_release_resource(resource: AgentClass, **kwargs)

真正地执行释放资源

参数:

名称 类型 描述 默认
resource AgentClass

待释放的资源

必需
源代码位于: logis/biz/sim/agent/iface/pool.py
def do_release_resource(self, resource: AgentClass, **kwargs):
    """
    真正地执行释放资源

    Args:
        resource: 待释放的资源
    """
    if self.use_simpy_store:
        assert self.__store__, "未初始化资源池,无法释放资源"
        return self.__store__.put(resource)
    else:
        raise NotImplementedError("do_release_resource not implemented")

do_request_resource

do_request_resource(**kwargs) -> Generator[simpy.Event, Any, Optional[AgentClass]]

真正地执行申请资源

源代码位于: logis/biz/sim/agent/iface/pool.py
def do_request_resource(
    self, **kwargs
) -> Generator[simpy.Event, Any, Optional[AgentClass]]:
    """
    真正地执行申请资源
    """
    if self.use_simpy_store:
        assert self.__store__, "未初始化资源池,无法申请资源"
        return self.__store__.get()
    else:
        raise NotImplementedError("do_request_resource not implemented")

get_resource_by_id

get_resource_by_id(id: AgentId)

根据智能体id获取智能体

参数:

名称 类型 描述 默认
id AgentId

智能体id

必需

返回:

类型 描述

Optional[IAgent]: 智能体

源代码位于: logis/biz/sim/agent/iface/pool.py
def get_resource_by_id(self, id: AgentId):
    """
    根据智能体id获取智能体

    Args:
        id: 智能体id

    Returns:
        Optional[IAgent]: 智能体
    """
    return self._all_resources.get(id, None)

init_simpy_store

init_simpy_store()

初始化simpy资源池

源代码位于: logis/biz/sim/agent/iface/pool.py
def init_simpy_store(self):
    """
    初始化simpy资源池
    """
    self.__store__ = simpy.Store(self.env, self.capacity)

is_locked

is_locked(resource: AgentClass) -> bool

判断资源是否已被锁定

参数:

名称 类型 描述 默认
resource AgentClass

待判断的资源

必需

返回:

名称 类型 描述
bool bool

是否已被锁定

源代码位于: logis/biz/sim/agent/iface/pool.py
def is_locked(self, resource: AgentClass) -> bool:
    """
    判断资源是否已被锁定

    Args:
        resource: 待判断的资源

    Returns:
        bool: 是否已被锁定
    """
    return resource.id in self._locked_resources

is_type_of

is_type_of(t: Type) -> bool

判断资源池是否是指定类型的资源

参数:

名称 类型 描述 默认
t Type

资源类型

必需

返回:

名称 类型 描述
bool bool

是否是指定类型的资源

源代码位于: logis/biz/sim/agent/iface/pool.py
def is_type_of(self, t: Type) -> bool:
    """
    判断资源池是否是指定类型的资源

    Args:
        t: 资源类型

    Returns:
        bool: 是否是指定类型的资源
    """
    if not self.resources:
        return False
    return isinstance(self.resources[0], t)

release_resource abstractmethod

release_resource(resource: AgentClass, *args, **kwargs)

释放资源。此方法是个简单的方法组合(可能并不通用),内部会依次调用: 1. before_release_resource 2. do_release_resource 3. after_resource_released

如果不满足您的需求,请自定义IAgentSelectionStrategy策略并实现release方法

参数:

名称 类型 描述 默认
resource AgentClass

要释放的资源

必需
kwargs

其他参数

{}
源代码位于: logis/biz/sim/agent/iface/pool.py
@abstractmethod
def release_resource(self, resource: AgentClass, *args, **kwargs):
    """
    释放资源。此方法是个简单的方法组合(可能并不通用),内部会依次调用:
    1. before_release_resource
    2. do_release_resource
    3. after_resource_released

    如果不满足您的需求,请自定义`IAgentSelectionStrategy`策略并实现`release`方法

    Args:
        resource: 要释放的资源
        kwargs: 其他参数
    """

    agent_pool = self
    agent_pool.before_release_resource(resource=resource, **kwargs)
    e = agent_pool.do_release_resource(resource=resource)
    # TODO: 这里是否不等待就可以
    yield e
    agent_pool.after_resource_released(resource=resource, **kwargs)
    return True

request_resource abstractmethod

request_resource(fast_fail: bool = False, **kwargs) -> Generator[simpy.Event, Any, Optional[IAgent]]

申请资源。此方法是个简单的方法组合(可能并不通用),内部会依次调用: 1. before_request_resource 2. do_request_resource 3. after_resource_requested

如果不满足您的需求,请自定义IAgentSelectionStrategy策略并实现request方法

参数:

名称 类型 描述 默认
strategy

智能体选择策略

必需
fast_fail bool

是否快速失败,如果为True,当资源池中没有可用资源时,直接返回None

False
kwargs

其他参数

{}

返回:

类型 描述
Optional[IAgent]

智能体事件生成器

源代码位于: logis/biz/sim/agent/iface/pool.py
@abstractmethod
def request_resource(
    self,
    fast_fail: bool = False,
    **kwargs,
) -> Generator[simpy.Event, Any, Optional[IAgent]]:
    """
    申请资源。此方法是个简单的方法组合(可能并不通用),内部会依次调用:
    1. before_request_resource
    2. do_request_resource
    3. after_resource_requested

    如果不满足您的需求,请自定义`IAgentSelectionStrategy`策略并实现`request`方法

    Args:
        strategy: 智能体选择策略
        fast_fail: 是否快速失败,如果为True,当资源池中没有可用资源时,直接返回None
        kwargs: 其他参数

    Returns:
        智能体事件生成器
    """

    if fast_fail and self.available_quantity <= 0:
        return None

    self.before_request_resource(**kwargs)
    try:
        req = self.do_request_resource()
        resource: Optional["IAgent"] = yield req
    except simpy.Interrupt:
        self.cancel_request_resource(req)
        resource = None

    if resource:
        self.after_resource_requested(resource=resource, **kwargs)
    return resource

set_locked

set_locked(resource: AgentClass, **kwargs)

标记资源为已被锁定

参数:

名称 类型 描述 默认
resource AgentClass

待标记的资源

必需
kwargs

其他参数

{}

返回:

名称 类型 描述
bool

是否成功标记资源为已被锁定

源代码位于: logis/biz/sim/agent/iface/pool.py
def set_locked(self, resource: AgentClass, **kwargs):
    """
    标记资源为已被锁定

    Args:
        resource: 待标记的资源
        kwargs: 其他参数

    Returns:
        bool: 是否成功标记资源为已被锁定
    """
    self._locked_resources[resource.id] = resource

    return True

unset_locked

unset_locked(resource: AgentClass, **kwargs)

取消标记资源为未被锁定

参数:

名称 类型 描述 默认
resource AgentClass

待取消标记的资源

必需
kwargs

其他参数

{}

返回:

名称 类型 描述
bool

是否成功取消标记资源为未被锁定

源代码位于: logis/biz/sim/agent/iface/pool.py
def unset_locked(self, resource: AgentClass, **kwargs):
    """
    取消标记资源为未被锁定

    Args:
        resource: 待取消标记的资源
        kwargs: 其他参数

    Returns:
        bool: 是否成功取消标记资源为未被锁定
    """
    self._locked_resources.pop(resource.id, None)
    return True

IAgentSelectionStrategy

智能体选择策略

源代码位于: logis/biz/sim/agent/iface/agent_apply_strategy.py
class IAgentSelectionStrategy(IExpose):
    """
    智能体选择策略
    """

    def __init__(self, agent_pool: Optional[IAgentPool] = None, **kwargs) -> None:
        """
        初始化智能体选择策略

        Args:
            agent_pool: 智能体池
            kwargs: 其他参数
        """
        super().__init__(**kwargs)
        self.agent_pool = agent_pool

    @abstractmethod
    def request(
        self, agent_pool: Optional[IAgentPool] = None, fast_fail: bool = False, **kwargs
    ) -> Optional[IAgent]:
        """
        选择符合需求的智能体

        Args:
            agent_pool: 智能体池,如果不传,默认使用初始化时指定的智能体池
            fast_fail: 是否快速失败,如果为True,且当前智能体池没有可用智能体,则直接返回None
            kwargs: 其他参数

        Returns:
            Optional[IAgent]: 符合需求的智能体
        """

    def release(self, agent: IAgent, agent_pool: Optional[IAgentPool] = None, **kwargs):
        """
        释放智能体

        Args:
            agent: 智能体
            agent_pool: 智能体池,如果不传,默认使用初始化时指定的智能体池
            kwargs: 其他参数

        Returns:
            bool: 是否成功释放智能体
        """
        agent_pool = agent_pool or self.agent_pool
        assert agent_pool, "未指定资源池,无法释放资源"
        agent_pool.before_release_resource(resource=agent, **kwargs)
        e = agent_pool.do_release_resource(resource=agent)
        # TODO: 这里是否不等待就可以
        yield e
        agent_pool.after_resource_released(resource=agent, **kwargs)
        return True

agent_pool instance-attribute

agent_pool = agent_pool

__init__

__init__(agent_pool: Optional[IAgentPool] = None, **kwargs) -> None

初始化智能体选择策略

参数:

名称 类型 描述 默认
agent_pool Optional[IAgentPool]

智能体池

None
kwargs

其他参数

{}
源代码位于: logis/biz/sim/agent/iface/agent_apply_strategy.py
def __init__(self, agent_pool: Optional[IAgentPool] = None, **kwargs) -> None:
    """
    初始化智能体选择策略

    Args:
        agent_pool: 智能体池
        kwargs: 其他参数
    """
    super().__init__(**kwargs)
    self.agent_pool = agent_pool

release

release(agent: IAgent, agent_pool: Optional[IAgentPool] = None, **kwargs)

释放智能体

参数:

名称 类型 描述 默认
agent IAgent

智能体

必需
agent_pool Optional[IAgentPool]

智能体池,如果不传,默认使用初始化时指定的智能体池

None
kwargs

其他参数

{}

返回:

名称 类型 描述
bool

是否成功释放智能体

源代码位于: logis/biz/sim/agent/iface/agent_apply_strategy.py
def release(self, agent: IAgent, agent_pool: Optional[IAgentPool] = None, **kwargs):
    """
    释放智能体

    Args:
        agent: 智能体
        agent_pool: 智能体池,如果不传,默认使用初始化时指定的智能体池
        kwargs: 其他参数

    Returns:
        bool: 是否成功释放智能体
    """
    agent_pool = agent_pool or self.agent_pool
    assert agent_pool, "未指定资源池,无法释放资源"
    agent_pool.before_release_resource(resource=agent, **kwargs)
    e = agent_pool.do_release_resource(resource=agent)
    # TODO: 这里是否不等待就可以
    yield e
    agent_pool.after_resource_released(resource=agent, **kwargs)
    return True

request abstractmethod

request(agent_pool: Optional[IAgentPool] = None, fast_fail: bool = False, **kwargs) -> Optional[IAgent]

选择符合需求的智能体

参数:

名称 类型 描述 默认
agent_pool Optional[IAgentPool]

智能体池,如果不传,默认使用初始化时指定的智能体池

None
fast_fail bool

是否快速失败,如果为True,且当前智能体池没有可用智能体,则直接返回None

False
kwargs

其他参数

{}

返回:

类型 描述
Optional[IAgent]

Optional[IAgent]: 符合需求的智能体

源代码位于: logis/biz/sim/agent/iface/agent_apply_strategy.py
@abstractmethod
def request(
    self, agent_pool: Optional[IAgentPool] = None, fast_fail: bool = False, **kwargs
) -> Optional[IAgent]:
    """
    选择符合需求的智能体

    Args:
        agent_pool: 智能体池,如果不传,默认使用初始化时指定的智能体池
        fast_fail: 是否快速失败,如果为True,且当前智能体池没有可用智能体,则直接返回None
        kwargs: 其他参数

    Returns:
        Optional[IAgent]: 符合需求的智能体
    """

IRackSelectionStrategy

货架选择策略,从若干货架中筛选出符合操作需求的货架

源代码位于: logis/biz/sim/storage/iface/select.py
class IRackSelectionStrategy(IExpose):
    """
    货架选择策略,从若干货架中筛选出符合操作需求的货架
    """

    def __init__(
        self, rack_group: Union[RackGroupClass, None] = None, **kwargs
    ) -> None:
        """
        初始化货架选择策略

        Args:
            rack_group: 货架组,如果此策略仅针对特定货架组则建议传,否则忽略即可
        """
        super().__init__(**kwargs)
        self.rack_group = rack_group

    @abstractmethod
    def select_racks(
        self,
        operation: OperationType,
        stock: IStock,
        rack_group: Optional[RackGroupClass] = None,
        **kwargs,
    ) -> List[IRack]:
        """
        从所有可操作的货架中筛选出符合操作需求的货架

        Args:
            operation: 操作类型
            stocks: 货物列表
            rack_group: 货架组,如果不传则使用初始化时传的货架组

        Returns:
            符合操作需求的货架列表
        """
        rack_group = rack_group or self.rack_group
        assert rack_group is not None, "未传入货架组,无法选择货架"
        result: List[RackClass] = []
        for rack in rack_group.racks:
            if not rack.is_able_to(operation, stock):
                continue
            result.append(rack)
        return result

rack_group instance-attribute

rack_group = rack_group

__init__

__init__(rack_group: Union[RackGroupClass, None] = None, **kwargs) -> None

初始化货架选择策略

参数:

名称 类型 描述 默认
rack_group Union[RackGroupClass, None]

货架组,如果此策略仅针对特定货架组则建议传,否则忽略即可

None
源代码位于: logis/biz/sim/storage/iface/select.py
def __init__(
    self, rack_group: Union[RackGroupClass, None] = None, **kwargs
) -> None:
    """
    初始化货架选择策略

    Args:
        rack_group: 货架组,如果此策略仅针对特定货架组则建议传,否则忽略即可
    """
    super().__init__(**kwargs)
    self.rack_group = rack_group

select_racks abstractmethod

select_racks(operation: OperationType, stock: IStock, rack_group: Optional[RackGroupClass] = None, **kwargs) -> List[IRack]

从所有可操作的货架中筛选出符合操作需求的货架

参数:

名称 类型 描述 默认
operation OperationType

操作类型

必需
stocks

货物列表

必需
rack_group Optional[RackGroupClass]

货架组,如果不传则使用初始化时传的货架组

None

返回:

类型 描述
List[IRack]

符合操作需求的货架列表

源代码位于: logis/biz/sim/storage/iface/select.py
@abstractmethod
def select_racks(
    self,
    operation: OperationType,
    stock: IStock,
    rack_group: Optional[RackGroupClass] = None,
    **kwargs,
) -> List[IRack]:
    """
    从所有可操作的货架中筛选出符合操作需求的货架

    Args:
        operation: 操作类型
        stocks: 货物列表
        rack_group: 货架组,如果不传则使用初始化时传的货架组

    Returns:
        符合操作需求的货架列表
    """
    rack_group = rack_group or self.rack_group
    assert rack_group is not None, "未传入货架组,无法选择货架"
    result: List[RackClass] = []
    for rack in rack_group.racks:
        if not rack.is_able_to(operation, stock):
            continue
        result.append(rack)
    return result

ITask

任务基础抽象类

源代码位于: logis/task/model/__init__.py
class ITask(metaclass=ABCMeta):
    """
    任务基础抽象类
    """

    @abstractmethod
    def get_task_id(self) -> TaskId:
        pass

    @abstractmethod
    def get_priority(self) -> TaskPriority:
        """
        任务优先级
        """
        pass

    @abstractmethod
    def update_status(self, status: TaskStatus):
        """
        更新任务状态
        """
        pass

    @abstractmethod
    def is_status_at(self, status: TaskStatus) -> bool:
        """
        检查任务状态是否为指定值
        """
        pass

    @property
    def finished(self) -> bool:
        return self.is_status_at(TaskStatus.FINISHED)

finished property

finished: bool

get_priority abstractmethod

get_priority() -> TaskPriority

任务优先级

源代码位于: logis/task/model/__init__.py
@abstractmethod
def get_priority(self) -> TaskPriority:
    """
    任务优先级
    """
    pass

get_task_id abstractmethod

get_task_id() -> TaskId
源代码位于: logis/task/model/__init__.py
@abstractmethod
def get_task_id(self) -> TaskId:
    pass

is_status_at abstractmethod

is_status_at(status: TaskStatus) -> bool

检查任务状态是否为指定值

源代码位于: logis/task/model/__init__.py
@abstractmethod
def is_status_at(self, status: TaskStatus) -> bool:
    """
    检查任务状态是否为指定值
    """
    pass

update_status abstractmethod

update_status(status: TaskStatus)

更新任务状态

源代码位于: logis/task/model/__init__.py
@abstractmethod
def update_status(self, status: TaskStatus):
    """
    更新任务状态
    """
    pass

ITransportBlueprint

搬运蓝图基类

源代码位于: logis/biz/sim/transport/iface/__init__.py
class ITransportBlueprint(IBlueprint):
    """
    搬运蓝图基类
    """

    def __init__(self, entity: Entity, *args, **kwargs):
        super().__init__(entity, *args, **kwargs)

        # 位置相关
        self.pickup_location_id: Optional[str] = none_if_in(
            entity.properties.get("取料位置", entity.properties.get("取货位置")),
            "-1",
            "null",
        )
        self.destination_id: str = none_if_in(
            entity.properties.get("选择目的地") or entity.properties.get("目的地"),
            "-1",
            "null",
        )
        assert self.destination_id, f"{self.name}未设置目的地"

        self.destination_selection_strategy: Optional[str] = none_if_in(
            entity.properties.get("目的地选择策略")
            or entity.properties.get("工作站选择策略"),
            "-1",
            "null",
        )
        """目的地选择策略"""
        self.__destination_strategy__: Optional[IRackSelectionStrategy] = None
        self.retrieval_location_selection_strategy: Optional[str] = (
            entity.properties.get("取料位置选择策略")
        )
        """取料位置选择策略"""
        self.__pickup_strategy__: Optional[IRackSelectionStrategy] = None

        self.transport_resource_id: str = none_if_in(
            entity.properties.get("选择搬运资源"), "-1", "null"
        )
        assert self.transport_resource_id, f"{self.name}未设置搬运资源"

        _, v = get_the_first_existent_key(
            entity.properties, "搬运资源选择策略", "选择搬运资源策略"
        )
        self.agent_selection_strategy_name: Optional[AgentSelectionStrategyName] = v
        """搬运资源选择策略"""
        self.__agent_selection_strategy__: Optional[IAgentSelectionStrategy] = None

        self.loading_time = Time.parse_str(entity.properties.get("装载时间", "0|秒"))
        """装载时间"""
        self.unloading_time = Time.parse_str(entity.properties.get("卸载时间", "0|秒"))
        """卸载时间"""
        self._moving_speed = none_if_in(
            entity.properties.get("移动速度"), "DefaultValue"
        )

        # 资源释放策略
        release_str: str = entity.properties.get("释放资源后", "")
        release_config = release_str.split("|")
        self.after_release_resource: AgentIdleStrategyOption = (
            release_config[0] if len(release_config) > 0 else ""
        )
        self.go_home_frequency: GoHomeStrategyFrequency = (
            release_config[1].split(":")[1] if len(release_config) > 1 else ""
        )
        self.__agent_idle_strategy__: Optional[AgentIdleStrategy] = None

        self.pathfinding_alg_name: str = entity.properties.get("寻路算法", "A*")
        """寻路算法名,默认A*算法"""
        self.__path_finding_strategy__: Optional[PathFindingAlgorithm] = None

        self._pickup_task_manifest = TaskManifest(component_id=self.create_edit_id)
        """取料任务清单"""
        self._delivery_task_manifest = TaskManifest(component_id=self.create_edit_id)
        """交付任务清单"""

        self._pickup_lock = simpy.Resource(self.env, capacity=1)
        """取料任务锁"""
        self._delivery_lock = simpy.Resource(self.env, capacity=1)
        """交付任务锁"""

    def infer_working_stage(self, stocks: List["IStock"]):
        """
        获取当前工作阶段
        """
        demo_stock = first(stocks, None)

        if not demo_stock:
            return None
        return demo_stock.__stage__

    def get_task_manifest(self, stage: Literal["pickup", "delivery"]) -> TaskManifest:
        """
        由于搬运具有取料和交付两种任务,
        因此需要分别设置取料任务清单和交付任务清单。

        Args:
            stage (Literal["pickup", "delivery"]): 任务阶段

        Returns:
            TaskManifest: 任务清单
        """
        if stage == "pickup":
            return self._pickup_task_manifest
        elif stage == "delivery":
            return self._delivery_task_manifest
        else:
            raise ValueError(f"{self.name}未知任务阶段: {stage}")

    @property
    @abstractmethod
    def pickup_location(self) -> Optional["Location"]:
        """
        取料位置
        """
        from logis.biz.sim import ILocationGetter

        inst = self.context.resolve_code_strategy(
            self.pickup_location_id, ILocationGetter, ctx=self.context
        )
        return inst.get() if inst else None

    @property
    @abstractmethod
    def destination(self) -> Optional["Location"]:
        """
        目的地
        """
        from logis.biz.sim import ILocationGetter

        inst = self.context.resolve_code_strategy(
            self.destination_id, ILocationGetter, ctx=self.context
        )
        return inst.get() if inst else None

    def get_pickup_strategy(self, **kwargs):
        """
        取料位置选择策略
        如果没有设置取料位置选择策略,尝试从前一个蓝图块获取
        """
        if self.__pickup_strategy__:
            return self.__pickup_strategy__

        from logis.biz.sim import BlueprintKind

        strategy = self.retrieval_location_selection_strategy

        # 兼容历史逻辑:如果没有取料位置选择策略,尝试从前一个蓝图块获取
        if not strategy:
            for node in self.previous_nodes(direct=True):
                if node.is_kind_of(BlueprintKind.ENTER):
                    # Enter节点可能有相关的策略
                    strategy = node.selection_policy
                    if strategy:
                        break
                elif node.is_kind_of(BlueprintKind.SOURCE):
                    # Source节点可能有相关的策略
                    strategy = node.generate_strategy
                    if strategy:
                        break
        self.__pickup_strategy__ = self.context.resolve_code_strategy(
            strategy,
            IRackSelectionStrategy,
            ctx=self.context,
        )
        return self.__pickup_strategy__

    def get_destination_strategy(self, **kwargs) -> Optional["IRackSelectionStrategy"]:
        """
        目的地选择策略
        """
        if not self.__destination_strategy__:
            self.__destination_strategy__ = self.context.resolve_code_strategy(
                self.destination_selection_strategy,
                IRackSelectionStrategy,
                ctx=self.context,
            )
        return self.__destination_strategy__

    @deprecated("use get_destination_strategy instead")
    def get_rack_selection_strategy(
        self, **kwargs
    ) -> Optional["IRackSelectionStrategy"]:
        """
        货架选择策略(原目的地选择策略)
        """
        return self.get_destination_strategy(**kwargs)

    def get_path_finding_strategy(self, **kwargs) -> Union[PathFindingAlgorithm, None]:
        """
        路径规划策略
        """
        if self.__path_finding_strategy__:
            return self.__path_finding_strategy__

        from logis.alg.path_finding import AStarPathFinding

        alg_class = find_algorithm(
            default_algorithm_matcher,
            alg_name=self.pathfinding_alg_name.lower() or "a_star",
            space=dict(a_star=AStarPathFinding) | {"a*": AStarPathFinding},
        )

        if alg_class:
            self.__path_finding_strategy__ = alg_class()
        else:
            self.__path_finding_strategy__ = self.context.resolve_code_strategy(
                self.pathfinding_alg_name, PathFindingAlgorithm, ctx=self.context
            )

        return self.__path_finding_strategy__

    def get_agent_selection_strategy(
        self, **kwargs
    ) -> Optional["IAgentSelectionStrategy"]:
        """
        智能体选择策略
        """
        if not self.__agent_selection_strategy__:
            self.__agent_selection_strategy__ = self.context.resolve_code_strategy(
                self.agent_selection_strategy_name,
                IAgentSelectionStrategy,
                ctx=self.context,
                agent_pool=self.transport_resource,
            )

        if self.__agent_selection_strategy__ is None:
            from logis.biz.sim.agent import DefaultAgentSelectionStrategy

            self.__agent_selection_strategy__ = DefaultAgentSelectionStrategy(
                ctx=self.context, agent_pool=self.transport_resource
            )
        return self.__agent_selection_strategy__

    def can_return_original_place(self, agent: Union["IAgent"], **kwargs):
        """
        检查指定智能体是否可以返回原始位置

        Args:
            agent: 智能体

        Returns:
            bool: 是否可以返回原始位置
        """
        if not agent.is_task_all_done():
            return False

        if self.go_home_frequency == "如果无其他任务":
            # FIXME: 此处判断逻辑待完善
            return self.get_task_manifest("delivery").no_stock_task_left()

        return True

    def get_agent_idle_strategy(self, **kwargs) -> Optional["AgentIdleStrategy"]:
        """
        智能体空闲策略
        """
        if self.__agent_idle_strategy__ is not None:
            return self.__agent_idle_strategy__
        if self.after_release_resource == "返回到归属地位置":
            from logis.biz.sim.agent import GoHomeStrategy

            self.__agent_idle_strategy__ = GoHomeStrategy(
                frequency=self.go_home_frequency, env=self.env, ctx=self.context
            )
        elif self.after_release_resource == "停留在原地":
            self.__agent_idle_strategy__ = None
        else:
            self.__agent_idle_strategy__ = self.context.resolve_code_strategy(
                self.after_release_resource,
                AgentIdleStrategy,
                ctx=self.context,
            )
        return self.__agent_idle_strategy__

    @property
    @abstractmethod
    def transport_resource(self) -> Optional["IAgentPool"]:
        """
        所使用的搬运资源
        """
        pass

    @property
    @abstractmethod
    def moving_speed(self) -> Optional[Speed]:
        """
        移动速度
        """
        pass

    def assign_target_location(
        self, *stocks: "IStock", **kwargs
    ) -> Generator[simpy.Event, Any, Tuple[List["IStock"], Optional["IStock"]]]:
        """
        为货物分配目标位置

        Args:
            stocks: 货物列表

        Returns:
            Tuple[List["Stock"], Optional["Stock"]]: 成功的货物列表、未分配的货物
        """
        raise NotImplementedError("assign_target_location not implemented")

    def transport_item(
        self,
        object: Union[List["IStock"], "IStock"],
        task_type: str,
        order: Optional["ITask"] = None,
        agent: Optional["IAgent"] = None,
        **kwargs,
    ) -> Generator[simpy.Event, Any, Optional["IStock"]]:
        """
        单次运输货物

        Args:
            object: 待运输目标
            task_type: 任务类型
            order: 对应的订单,如果所有货物属于同一个订单,否则应从货物中获取order_id
            agent: 所使用的智能体,如果不传,则应在此方法内部分配、释放智能体

        Returns:
            Optional["IStock"]: 运输成功返回运输的货物,否则返回None

        Yields:
            simpy.Event: 运输事件
        """

        raise NotImplementedError("transport_item not implemented")

__agent_idle_strategy__ instance-attribute

__agent_idle_strategy__: Optional[AgentIdleStrategy] = None

__agent_selection_strategy__ instance-attribute

__agent_selection_strategy__: Optional[IAgentSelectionStrategy] = None

__destination_strategy__ instance-attribute

__destination_strategy__: Optional[IRackSelectionStrategy] = None

__path_finding_strategy__ instance-attribute

__path_finding_strategy__: Optional[PathFindingAlgorithm] = None

__pickup_strategy__ instance-attribute

__pickup_strategy__: Optional[IRackSelectionStrategy] = None

after_release_resource instance-attribute

after_release_resource: AgentIdleStrategyOption = release_config[0] if len(release_config) > 0 else ''

agent_selection_strategy_name instance-attribute

agent_selection_strategy_name: Optional[AgentSelectionStrategyName] = v

搬运资源选择策略

destination abstractmethod property

destination: Optional[Location]

目的地

destination_id instance-attribute

destination_id: str = none_if_in(get('选择目的地') or get('目的地'), '-1', 'null')

destination_selection_strategy instance-attribute

destination_selection_strategy: Optional[str] = none_if_in(get('目的地选择策略') or get('工作站选择策略'), '-1', 'null')

目的地选择策略

go_home_frequency instance-attribute

go_home_frequency: GoHomeStrategyFrequency = split(':')[1] if len(release_config) > 1 else ''

loading_time instance-attribute

loading_time = parse_str(get('装载时间', '0|秒'))

装载时间

moving_speed abstractmethod property

moving_speed: Optional[Speed]

移动速度

pathfinding_alg_name instance-attribute

pathfinding_alg_name: str = get('寻路算法', 'A*')

寻路算法名,默认A*算法

pickup_location abstractmethod property

pickup_location: Optional[Location]

取料位置

pickup_location_id instance-attribute

pickup_location_id: Optional[str] = none_if_in(get('取料位置', get('取货位置')), '-1', 'null')

retrieval_location_selection_strategy instance-attribute

retrieval_location_selection_strategy: Optional[str] = get('取料位置选择策略')

取料位置选择策略

transport_resource abstractmethod property

transport_resource: Optional[IAgentPool]

所使用的搬运资源

transport_resource_id instance-attribute

transport_resource_id: str = none_if_in(get('选择搬运资源'), '-1', 'null')

unloading_time instance-attribute

unloading_time = parse_str(get('卸载时间', '0|秒'))

卸载时间

__init__

__init__(entity: ComponentForm, *args, **kwargs)
源代码位于: logis/biz/sim/transport/iface/__init__.py
def __init__(self, entity: Entity, *args, **kwargs):
    super().__init__(entity, *args, **kwargs)

    # 位置相关
    self.pickup_location_id: Optional[str] = none_if_in(
        entity.properties.get("取料位置", entity.properties.get("取货位置")),
        "-1",
        "null",
    )
    self.destination_id: str = none_if_in(
        entity.properties.get("选择目的地") or entity.properties.get("目的地"),
        "-1",
        "null",
    )
    assert self.destination_id, f"{self.name}未设置目的地"

    self.destination_selection_strategy: Optional[str] = none_if_in(
        entity.properties.get("目的地选择策略")
        or entity.properties.get("工作站选择策略"),
        "-1",
        "null",
    )
    """目的地选择策略"""
    self.__destination_strategy__: Optional[IRackSelectionStrategy] = None
    self.retrieval_location_selection_strategy: Optional[str] = (
        entity.properties.get("取料位置选择策略")
    )
    """取料位置选择策略"""
    self.__pickup_strategy__: Optional[IRackSelectionStrategy] = None

    self.transport_resource_id: str = none_if_in(
        entity.properties.get("选择搬运资源"), "-1", "null"
    )
    assert self.transport_resource_id, f"{self.name}未设置搬运资源"

    _, v = get_the_first_existent_key(
        entity.properties, "搬运资源选择策略", "选择搬运资源策略"
    )
    self.agent_selection_strategy_name: Optional[AgentSelectionStrategyName] = v
    """搬运资源选择策略"""
    self.__agent_selection_strategy__: Optional[IAgentSelectionStrategy] = None

    self.loading_time = Time.parse_str(entity.properties.get("装载时间", "0|秒"))
    """装载时间"""
    self.unloading_time = Time.parse_str(entity.properties.get("卸载时间", "0|秒"))
    """卸载时间"""
    self._moving_speed = none_if_in(
        entity.properties.get("移动速度"), "DefaultValue"
    )

    # 资源释放策略
    release_str: str = entity.properties.get("释放资源后", "")
    release_config = release_str.split("|")
    self.after_release_resource: AgentIdleStrategyOption = (
        release_config[0] if len(release_config) > 0 else ""
    )
    self.go_home_frequency: GoHomeStrategyFrequency = (
        release_config[1].split(":")[1] if len(release_config) > 1 else ""
    )
    self.__agent_idle_strategy__: Optional[AgentIdleStrategy] = None

    self.pathfinding_alg_name: str = entity.properties.get("寻路算法", "A*")
    """寻路算法名,默认A*算法"""
    self.__path_finding_strategy__: Optional[PathFindingAlgorithm] = None

    self._pickup_task_manifest = TaskManifest(component_id=self.create_edit_id)
    """取料任务清单"""
    self._delivery_task_manifest = TaskManifest(component_id=self.create_edit_id)
    """交付任务清单"""

    self._pickup_lock = simpy.Resource(self.env, capacity=1)
    """取料任务锁"""
    self._delivery_lock = simpy.Resource(self.env, capacity=1)
    """交付任务锁"""

assign_target_location

assign_target_location(*stocks: IStock, **kwargs) -> Generator[simpy.Event, Any, Tuple[List[IStock], Optional[IStock]]]

为货物分配目标位置

参数:

名称 类型 描述 默认
stocks IStock

货物列表

()

返回:

类型 描述
Tuple[List[IStock], Optional[IStock]]

Tuple[List["Stock"], Optional["Stock"]]: 成功的货物列表、未分配的货物

源代码位于: logis/biz/sim/transport/iface/__init__.py
def assign_target_location(
    self, *stocks: "IStock", **kwargs
) -> Generator[simpy.Event, Any, Tuple[List["IStock"], Optional["IStock"]]]:
    """
    为货物分配目标位置

    Args:
        stocks: 货物列表

    Returns:
        Tuple[List["Stock"], Optional["Stock"]]: 成功的货物列表、未分配的货物
    """
    raise NotImplementedError("assign_target_location not implemented")

can_return_original_place

can_return_original_place(agent: Union[IAgent], **kwargs)

检查指定智能体是否可以返回原始位置

参数:

名称 类型 描述 默认
agent Union[IAgent]

智能体

必需

返回:

名称 类型 描述
bool

是否可以返回原始位置

源代码位于: logis/biz/sim/transport/iface/__init__.py
def can_return_original_place(self, agent: Union["IAgent"], **kwargs):
    """
    检查指定智能体是否可以返回原始位置

    Args:
        agent: 智能体

    Returns:
        bool: 是否可以返回原始位置
    """
    if not agent.is_task_all_done():
        return False

    if self.go_home_frequency == "如果无其他任务":
        # FIXME: 此处判断逻辑待完善
        return self.get_task_manifest("delivery").no_stock_task_left()

    return True

get_agent_idle_strategy

get_agent_idle_strategy(**kwargs) -> Optional[AgentIdleStrategy]

智能体空闲策略

源代码位于: logis/biz/sim/transport/iface/__init__.py
def get_agent_idle_strategy(self, **kwargs) -> Optional["AgentIdleStrategy"]:
    """
    智能体空闲策略
    """
    if self.__agent_idle_strategy__ is not None:
        return self.__agent_idle_strategy__
    if self.after_release_resource == "返回到归属地位置":
        from logis.biz.sim.agent import GoHomeStrategy

        self.__agent_idle_strategy__ = GoHomeStrategy(
            frequency=self.go_home_frequency, env=self.env, ctx=self.context
        )
    elif self.after_release_resource == "停留在原地":
        self.__agent_idle_strategy__ = None
    else:
        self.__agent_idle_strategy__ = self.context.resolve_code_strategy(
            self.after_release_resource,
            AgentIdleStrategy,
            ctx=self.context,
        )
    return self.__agent_idle_strategy__

get_agent_selection_strategy

get_agent_selection_strategy(**kwargs) -> Optional[IAgentSelectionStrategy]

智能体选择策略

源代码位于: logis/biz/sim/transport/iface/__init__.py
def get_agent_selection_strategy(
    self, **kwargs
) -> Optional["IAgentSelectionStrategy"]:
    """
    智能体选择策略
    """
    if not self.__agent_selection_strategy__:
        self.__agent_selection_strategy__ = self.context.resolve_code_strategy(
            self.agent_selection_strategy_name,
            IAgentSelectionStrategy,
            ctx=self.context,
            agent_pool=self.transport_resource,
        )

    if self.__agent_selection_strategy__ is None:
        from logis.biz.sim.agent import DefaultAgentSelectionStrategy

        self.__agent_selection_strategy__ = DefaultAgentSelectionStrategy(
            ctx=self.context, agent_pool=self.transport_resource
        )
    return self.__agent_selection_strategy__

get_destination_strategy

get_destination_strategy(**kwargs) -> Optional[IRackSelectionStrategy]

目的地选择策略

源代码位于: logis/biz/sim/transport/iface/__init__.py
def get_destination_strategy(self, **kwargs) -> Optional["IRackSelectionStrategy"]:
    """
    目的地选择策略
    """
    if not self.__destination_strategy__:
        self.__destination_strategy__ = self.context.resolve_code_strategy(
            self.destination_selection_strategy,
            IRackSelectionStrategy,
            ctx=self.context,
        )
    return self.__destination_strategy__

get_path_finding_strategy

get_path_finding_strategy(**kwargs) -> Union[PathFindingAlgorithm, None]

路径规划策略

源代码位于: logis/biz/sim/transport/iface/__init__.py
def get_path_finding_strategy(self, **kwargs) -> Union[PathFindingAlgorithm, None]:
    """
    路径规划策略
    """
    if self.__path_finding_strategy__:
        return self.__path_finding_strategy__

    from logis.alg.path_finding import AStarPathFinding

    alg_class = find_algorithm(
        default_algorithm_matcher,
        alg_name=self.pathfinding_alg_name.lower() or "a_star",
        space=dict(a_star=AStarPathFinding) | {"a*": AStarPathFinding},
    )

    if alg_class:
        self.__path_finding_strategy__ = alg_class()
    else:
        self.__path_finding_strategy__ = self.context.resolve_code_strategy(
            self.pathfinding_alg_name, PathFindingAlgorithm, ctx=self.context
        )

    return self.__path_finding_strategy__

get_pickup_strategy

get_pickup_strategy(**kwargs)

取料位置选择策略 如果没有设置取料位置选择策略,尝试从前一个蓝图块获取

源代码位于: logis/biz/sim/transport/iface/__init__.py
def get_pickup_strategy(self, **kwargs):
    """
    取料位置选择策略
    如果没有设置取料位置选择策略,尝试从前一个蓝图块获取
    """
    if self.__pickup_strategy__:
        return self.__pickup_strategy__

    from logis.biz.sim import BlueprintKind

    strategy = self.retrieval_location_selection_strategy

    # 兼容历史逻辑:如果没有取料位置选择策略,尝试从前一个蓝图块获取
    if not strategy:
        for node in self.previous_nodes(direct=True):
            if node.is_kind_of(BlueprintKind.ENTER):
                # Enter节点可能有相关的策略
                strategy = node.selection_policy
                if strategy:
                    break
            elif node.is_kind_of(BlueprintKind.SOURCE):
                # Source节点可能有相关的策略
                strategy = node.generate_strategy
                if strategy:
                    break
    self.__pickup_strategy__ = self.context.resolve_code_strategy(
        strategy,
        IRackSelectionStrategy,
        ctx=self.context,
    )
    return self.__pickup_strategy__

get_rack_selection_strategy

get_rack_selection_strategy(**kwargs) -> Optional[IRackSelectionStrategy]

货架选择策略(原目的地选择策略)

源代码位于: logis/biz/sim/transport/iface/__init__.py
@deprecated("use get_destination_strategy instead")
def get_rack_selection_strategy(
    self, **kwargs
) -> Optional["IRackSelectionStrategy"]:
    """
    货架选择策略(原目的地选择策略)
    """
    return self.get_destination_strategy(**kwargs)

get_task_manifest

get_task_manifest(stage: Literal['pickup', 'delivery']) -> TaskManifest

由于搬运具有取料和交付两种任务, 因此需要分别设置取料任务清单和交付任务清单。

参数:

名称 类型 描述 默认
stage Literal['pickup', 'delivery']

任务阶段

必需

返回:

名称 类型 描述
TaskManifest TaskManifest

任务清单

源代码位于: logis/biz/sim/transport/iface/__init__.py
def get_task_manifest(self, stage: Literal["pickup", "delivery"]) -> TaskManifest:
    """
    由于搬运具有取料和交付两种任务,
    因此需要分别设置取料任务清单和交付任务清单。

    Args:
        stage (Literal["pickup", "delivery"]): 任务阶段

    Returns:
        TaskManifest: 任务清单
    """
    if stage == "pickup":
        return self._pickup_task_manifest
    elif stage == "delivery":
        return self._delivery_task_manifest
    else:
        raise ValueError(f"{self.name}未知任务阶段: {stage}")

infer_working_stage

infer_working_stage(stocks: List[IStock])

获取当前工作阶段

源代码位于: logis/biz/sim/transport/iface/__init__.py
def infer_working_stage(self, stocks: List["IStock"]):
    """
    获取当前工作阶段
    """
    demo_stock = first(stocks, None)

    if not demo_stock:
        return None
    return demo_stock.__stage__

transport_item

transport_item(object: Union[List[IStock], IStock], task_type: str, order: Optional[ITask] = None, agent: Optional[IAgent] = None, **kwargs) -> Generator[simpy.Event, Any, Optional[IStock]]

单次运输货物

参数:

名称 类型 描述 默认
object Union[List[IStock], IStock]

待运输目标

必需
task_type str

任务类型

必需
order Optional[ITask]

对应的订单,如果所有货物属于同一个订单,否则应从货物中获取order_id

None
agent Optional[IAgent]

所使用的智能体,如果不传,则应在此方法内部分配、释放智能体

None

返回:

类型 描述
Optional[IStock]

Optional["IStock"]: 运输成功返回运输的货物,否则返回None

产生:

类型 描述
Event

simpy.Event: 运输事件

源代码位于: logis/biz/sim/transport/iface/__init__.py
def transport_item(
    self,
    object: Union[List["IStock"], "IStock"],
    task_type: str,
    order: Optional["ITask"] = None,
    agent: Optional["IAgent"] = None,
    **kwargs,
) -> Generator[simpy.Event, Any, Optional["IStock"]]:
    """
    单次运输货物

    Args:
        object: 待运输目标
        task_type: 任务类型
        order: 对应的订单,如果所有货物属于同一个订单,否则应从货物中获取order_id
        agent: 所使用的智能体,如果不传,则应在此方法内部分配、释放智能体

    Returns:
        Optional["IStock"]: 运输成功返回运输的货物,否则返回None

    Yields:
        simpy.Event: 运输事件
    """

    raise NotImplementedError("transport_item not implemented")

ITransportDevice

搬运设备(智能体的一种),同时具备移动属性

源代码位于: logis/biz/sim/transport/__init__.py
class ITransportDevice(IAgent):
    """
    搬运设备(智能体的一种),同时具备移动属性
    """

    def is_valid(self) -> bool:
        return True

    @classmethod
    def with_properties(cls, p: TransportProperties, **kwargs):
        inst = cls(**kwargs)
        inst.props = p
        return inst

    props: Optional[TransportProperties] = None

    def load(self, *args, **kwargs):
        """
        装载过程
        """
        t = get_time(self.props.load_distance, self.props.load_speed)
        yield self.env.timeout(t)

    def unload(self, *args, **kwargs):
        """
        卸载过程
        """
        l = self.props.unload_distance or self.props.load_distance
        v = self.props.unload_speed or self.props.load_speed
        t = get_time(l, v)
        yield self.env.timeout(t)

    def move(self, *args, **kwargs):
        """
        移动过程
        """
        d = self.props.distance_vector
        assert d is not None, "coordinates given can't be None"
        yield self.env.timeout(get_time_3d(d, self.props.speed))
        self.props.current_location = copy.copy(self.props.target_location)

    def find_path(self, *args, **kwargs):
        """
        寻找路径
        """
        raise NotImplementedError("find_path method not implemented")

props class-attribute instance-attribute

props: Optional[TransportProperties] = None

find_path

find_path(*args, **kwargs)

寻找路径

源代码位于: logis/biz/sim/transport/__init__.py
def find_path(self, *args, **kwargs):
    """
    寻找路径
    """
    raise NotImplementedError("find_path method not implemented")

is_valid

is_valid() -> bool
源代码位于: logis/biz/sim/transport/__init__.py
def is_valid(self) -> bool:
    return True

load

load(*args, **kwargs)

装载过程

源代码位于: logis/biz/sim/transport/__init__.py
def load(self, *args, **kwargs):
    """
    装载过程
    """
    t = get_time(self.props.load_distance, self.props.load_speed)
    yield self.env.timeout(t)

move

move(*args, **kwargs)

移动过程

源代码位于: logis/biz/sim/transport/__init__.py
def move(self, *args, **kwargs):
    """
    移动过程
    """
    d = self.props.distance_vector
    assert d is not None, "coordinates given can't be None"
    yield self.env.timeout(get_time_3d(d, self.props.speed))
    self.props.current_location = copy.copy(self.props.target_location)

unload

unload(*args, **kwargs)

卸载过程

源代码位于: logis/biz/sim/transport/__init__.py
def unload(self, *args, **kwargs):
    """
    卸载过程
    """
    l = self.props.unload_distance or self.props.load_distance
    v = self.props.unload_speed or self.props.load_speed
    t = get_time(l, v)
    yield self.env.timeout(t)

with_properties classmethod

with_properties(p: TransportProperties, **kwargs)
源代码位于: logis/biz/sim/transport/__init__.py
@classmethod
def with_properties(cls, p: TransportProperties, **kwargs):
    inst = cls(**kwargs)
    inst.props = p
    return inst

ManagedMovable

按照外部manager规划路线进行移动 FIXME: 此方式暂未完全实现

源代码位于: logis/biz/sim/transport/__init__.py
class ManagedMovable(metaclass=ABCMeta):
    """
    按照外部manager规划路线进行移动
    FIXME: 此方式暂未完全实现
    """

    SIGNAL_RESTORE = "restore"

    @abstractmethod
    def get_target_location(self) -> Point:
        pass

    @abstractmethod
    def get_current_location(self) -> Point:
        pass

    @abstractmethod
    def _managed_move_(
        self, end: Point, start: Optional[Point] = None, **kwargs
    ) -> Generator:
        pass

    def __init__(self, *args, **kwargs):
        self.__control_queue__ = Queue()
        self.routine: Queue[Point] = Queue()
        self.__state__: Optional[Literal["paused", "stopped", "active"]] = None

        self.__last_routine__: List[Point] = []
        self.__lock__ = Lock()

    def clear_routine(self):
        """
        清除阻塞队列中的所有点
        """
        with self.__lock__:
            logging.debug("clear routine, before size is %s", self.routine.qsize())
            return self.routine.queue.clear()

    def update_routine(self, routine: Iterable[Point]):
        """
        更新阻塞队列中的路线:删除原有点并添加新的点
        """
        routine = list(routine)
        logging.debug("%s -> %s", self.__last_routine__, routine)
        self.clear_routine()
        for point in routine:
            self.routine.put(point)
        self.__last_routine__ = list(routine)

    def stop_moving_by_routine(self):
        """
        停止移动
        """
        self.__state__ = "stopped"
        logging.debug("%s is stopped", self)

    def pause_moving_by_routine(self):
        self.__state__ = "paused"
        logging.debug("%s is paused", self)
        self.__control_queue__.queue.clear()

    def restore_moving_by_routine(self):
        """
        恢复暂停的进度
        """
        if not self.is_paused:
            logging.warning("%s is not paused, can't restore", self)
            return
        self.__state__ = "active"
        self.__control_queue__.put(self.SIGNAL_RESTORE)
        logging.debug("%s is restored", self)

    def is_routine_empty(self) -> bool:
        return self.routine.empty()

    def wait_routine_empty(self, interval: float = 1):
        """
        等待移动完成
        """
        while not self.is_routine_empty():
            logging.debug(
                "%s is waiting for routine is empty, now size is %s",
                self.__class__.__name__,
                self.routine.qsize(),
            )
            yield from self.wait(interval)

    @abstractmethod
    def wait(self, time: float = 0):
        pass

    @abstractmethod
    def process(self, generator: Generator):
        pass

    @property
    def is_paused(self) -> bool:
        return self.__state__ == "paused"

    @property
    def is_stopped(self) -> bool:
        return self.__state__ == "stopped"

    @property
    def is_active(self) -> bool:
        return self.__state__ == "active"

    def _get_next_point(self, **kwargs) -> Point:
        """
        获取下一个点
        """
        if self.is_paused:
            return self.__control_queue__.get(**kwargs)
        return self.routine.get(**kwargs)

    def start_moving_by_routine(
        self,
        until_target: bool = False,
        until_empty: bool = False,
        wait_interval: float = 3,
        **kwargs,
    ):
        """
        按照阻塞队列中的路线移动
        """
        assert self.is_active is not True, "already moving by routine"
        logging.debug("%s start moving by routine", self)

        self.__state__ = "active"
        # TODO: 加锁?
        # 调用此方法即是强制开启移动,是否正在移动中由调用方判断
        last_point = None
        # 在运行过程中外部调用可能停止此过程
        while not (self.is_stopped):
            if until_empty and self.routine.empty():
                logging.debug("%s routine is empty, will stop", self)
                break
            try:
                next_point = self._get_next_point()
                if next_point == self.SIGNAL_RESTORE:
                    continue
            except:
                logging.debug("%s is waiting for next point", self)
                yield from self.wait(wait_interval)
                continue
            if self.is_stopped:
                break
            logging.debug(
                "%s last is %s,current is %s,next is %s",
                self,
                last_point,
                self.get_current_location(),
                next_point,
            )
            if last_point is not None and last_point != self.get_current_location():
                logging.warning(
                    "%s!=%s,who update current_location?",
                    last_point,
                    self.get_current_location(),
                )
            yield self.process(self._managed_move_(end=next_point, **kwargs))
            if (
                until_target
                and self.get_current_location() == self.get_target_location()
            ):
                break
            last_point = next_point

        self.stop_moving_by_routine()
        logging.info("%s stop moving by routine", self)

SIGNAL_RESTORE class-attribute instance-attribute

SIGNAL_RESTORE = 'restore'

__control_queue__ instance-attribute

__control_queue__ = Queue()

__last_routine__ instance-attribute

__last_routine__: List[Point] = []

__lock__ instance-attribute

__lock__ = Lock()

__state__ instance-attribute

__state__: Optional[Literal['paused', 'stopped', 'active']] = None

is_active property

is_active: bool

is_paused property

is_paused: bool

is_stopped property

is_stopped: bool

routine instance-attribute

routine: Queue[Point] = Queue()

__init__

__init__(*args, **kwargs)
源代码位于: logis/biz/sim/transport/__init__.py
def __init__(self, *args, **kwargs):
    self.__control_queue__ = Queue()
    self.routine: Queue[Point] = Queue()
    self.__state__: Optional[Literal["paused", "stopped", "active"]] = None

    self.__last_routine__: List[Point] = []
    self.__lock__ = Lock()

clear_routine

clear_routine()

清除阻塞队列中的所有点

源代码位于: logis/biz/sim/transport/__init__.py
def clear_routine(self):
    """
    清除阻塞队列中的所有点
    """
    with self.__lock__:
        logging.debug("clear routine, before size is %s", self.routine.qsize())
        return self.routine.queue.clear()

get_current_location abstractmethod

get_current_location() -> Point
源代码位于: logis/biz/sim/transport/__init__.py
@abstractmethod
def get_current_location(self) -> Point:
    pass

get_target_location abstractmethod

get_target_location() -> Point
源代码位于: logis/biz/sim/transport/__init__.py
@abstractmethod
def get_target_location(self) -> Point:
    pass

is_routine_empty

is_routine_empty() -> bool
源代码位于: logis/biz/sim/transport/__init__.py
def is_routine_empty(self) -> bool:
    return self.routine.empty()

pause_moving_by_routine

pause_moving_by_routine()
源代码位于: logis/biz/sim/transport/__init__.py
def pause_moving_by_routine(self):
    self.__state__ = "paused"
    logging.debug("%s is paused", self)
    self.__control_queue__.queue.clear()

process abstractmethod

process(generator: Generator)
源代码位于: logis/biz/sim/transport/__init__.py
@abstractmethod
def process(self, generator: Generator):
    pass

restore_moving_by_routine

restore_moving_by_routine()

恢复暂停的进度

源代码位于: logis/biz/sim/transport/__init__.py
def restore_moving_by_routine(self):
    """
    恢复暂停的进度
    """
    if not self.is_paused:
        logging.warning("%s is not paused, can't restore", self)
        return
    self.__state__ = "active"
    self.__control_queue__.put(self.SIGNAL_RESTORE)
    logging.debug("%s is restored", self)

start_moving_by_routine

start_moving_by_routine(until_target: bool = False, until_empty: bool = False, wait_interval: float = 3, **kwargs)

按照阻塞队列中的路线移动

源代码位于: logis/biz/sim/transport/__init__.py
def start_moving_by_routine(
    self,
    until_target: bool = False,
    until_empty: bool = False,
    wait_interval: float = 3,
    **kwargs,
):
    """
    按照阻塞队列中的路线移动
    """
    assert self.is_active is not True, "already moving by routine"
    logging.debug("%s start moving by routine", self)

    self.__state__ = "active"
    # TODO: 加锁?
    # 调用此方法即是强制开启移动,是否正在移动中由调用方判断
    last_point = None
    # 在运行过程中外部调用可能停止此过程
    while not (self.is_stopped):
        if until_empty and self.routine.empty():
            logging.debug("%s routine is empty, will stop", self)
            break
        try:
            next_point = self._get_next_point()
            if next_point == self.SIGNAL_RESTORE:
                continue
        except:
            logging.debug("%s is waiting for next point", self)
            yield from self.wait(wait_interval)
            continue
        if self.is_stopped:
            break
        logging.debug(
            "%s last is %s,current is %s,next is %s",
            self,
            last_point,
            self.get_current_location(),
            next_point,
        )
        if last_point is not None and last_point != self.get_current_location():
            logging.warning(
                "%s!=%s,who update current_location?",
                last_point,
                self.get_current_location(),
            )
        yield self.process(self._managed_move_(end=next_point, **kwargs))
        if (
            until_target
            and self.get_current_location() == self.get_target_location()
        ):
            break
        last_point = next_point

    self.stop_moving_by_routine()
    logging.info("%s stop moving by routine", self)

stop_moving_by_routine

stop_moving_by_routine()

停止移动

源代码位于: logis/biz/sim/transport/__init__.py
def stop_moving_by_routine(self):
    """
    停止移动
    """
    self.__state__ = "stopped"
    logging.debug("%s is stopped", self)

update_routine

update_routine(routine: Iterable[Point])

更新阻塞队列中的路线:删除原有点并添加新的点

源代码位于: logis/biz/sim/transport/__init__.py
def update_routine(self, routine: Iterable[Point]):
    """
    更新阻塞队列中的路线:删除原有点并添加新的点
    """
    routine = list(routine)
    logging.debug("%s -> %s", self.__last_routine__, routine)
    self.clear_routine()
    for point in routine:
        self.routine.put(point)
    self.__last_routine__ = list(routine)

wait abstractmethod

wait(time: float = 0)
源代码位于: logis/biz/sim/transport/__init__.py
@abstractmethod
def wait(self, time: float = 0):
    pass

wait_routine_empty

wait_routine_empty(interval: float = 1)

等待移动完成

源代码位于: logis/biz/sim/transport/__init__.py
def wait_routine_empty(self, interval: float = 1):
    """
    等待移动完成
    """
    while not self.is_routine_empty():
        logging.debug(
            "%s is waiting for routine is empty, now size is %s",
            self.__class__.__name__,
            self.routine.qsize(),
        )
        yield from self.wait(interval)

ObstacleDetector

障碍检测器

源代码位于: logis/biz/sim/transport/__init__.py
@runtime_checkable
class ObstacleDetector(Protocol):
    """
    障碍检测器
    """

    def detect_obstacles(
        self, config: Optional[ObstacleDetectorConfig] = None, *args, **kwargs
    ) -> ObstacleDetectorOutput:
        pass

detect_obstacles

detect_obstacles(config: Optional[ObstacleDetectorConfig] = None, *args, **kwargs) -> ObstacleDetectorOutput
源代码位于: logis/biz/sim/transport/__init__.py
def detect_obstacles(
    self, config: Optional[ObstacleDetectorConfig] = None, *args, **kwargs
) -> ObstacleDetectorOutput:
    pass

ObstacleDetectorConfig

障碍检测配置

源代码位于: logis/biz/sim/transport/__init__.py
class ObstacleDetectorConfig(BaseModel):
    """
    障碍检测配置
    """

    agent_as_obstacle: bool = False
    rack_as_obstacle: bool = False
    end_point_as_obstacle: bool = False

agent_as_obstacle class-attribute instance-attribute

agent_as_obstacle: bool = False

end_point_as_obstacle class-attribute instance-attribute

end_point_as_obstacle: bool = False

rack_as_obstacle class-attribute instance-attribute

rack_as_obstacle: bool = False

ObstacleDetectorOutput

源代码位于: logis/biz/sim/transport/__init__.py
class ObstacleDetectorOutput(BaseModel):

    obstacles: List[Point] = Field(default_factory=list)

    model_config = DEFAULT_PYDANTIC_MODEL_CONFIG

model_config class-attribute instance-attribute

model_config = DEFAULT_PYDANTIC_MODEL_CONFIG

obstacles class-attribute instance-attribute

obstacles: List[Point] = Field(default_factory=list)

PathFindingAlgorithm

寻路算法

源代码位于: logis/alg/path_finding/base.py
class PathFindingAlgorithm(metaclass=ABCMeta):
    """
    寻路算法
    """

    type: Union[PathFindingAlgorithmType, str]

    @abstractmethod
    def find_path(self, input: "PathFindingInput", **kwargs) -> "PathFindingOutput":
        """
        寻找从start到end的路径
        """
        pass

type instance-attribute

type: Union[PathFindingAlgorithmType, str]

find_path abstractmethod

find_path(input: PathFindingInput, **kwargs) -> PathFindingOutput

寻找从start到end的路径

源代码位于: logis/alg/path_finding/base.py
@abstractmethod
def find_path(self, input: "PathFindingInput", **kwargs) -> "PathFindingOutput":
    """
    寻找从start到end的路径
    """
    pass

Speed

源代码位于: logis/data_type/unitable.py
class Speed(QuantifiedValue):
    pass

Time

时间

源代码位于: logis/data_type/unitable.py
class Time(QuantifiedValue):
    """
    时间
    """

    pass

default_algorithm_matcher

default_algorithm_matcher(alg_name: str, **space) -> PathFindingAlgorithmType

默认算法匹配器 根据输入的类型匹配对应的寻路算法 如果找不到,则返回默认的A*算法

源代码位于: logis/alg/path_finding/base.py
def default_algorithm_matcher(alg_name: str, **space) -> PathFindingAlgorithmType:
    """
    默认算法匹配器
    根据输入的类型匹配对应的寻路算法
    如果找不到,则返回默认的A*算法
    """
    if alg_name == "default":
        return A_STAR_ALGORITHM

    for _, v in (space or globals()).items():
        if isinstance(v, str) and v == alg_name:
            return v
    return A_STAR_ALGORITHM

find_algorithm

find_algorithm(resolver: AlgorithmTypeResolver, space: Optional[Dict] = None, **kwargs) -> Type[PathFindingAlgorithm]

根据解析器和参数获取对应的寻路算法类

参数:

名称 类型 描述 默认
resolver AlgorithmTypeResolver

解析器函数

必需
kwargs

解析器需要的参数

{}
源代码位于: logis/alg/path_finding/base.py
def find_algorithm(
    resolver: AlgorithmTypeResolver, space: Optional[Dict] = None, **kwargs
) -> Type[PathFindingAlgorithm]:
    """
    根据解析器和参数获取对应的寻路算法类

    Args:
        resolver: 解析器函数
        kwargs: 解析器需要的参数
    """
    type = resolver(**kwargs)
    return get_algorithm(type, **space)