跳转至

数据结构

AgentId module-attribute

AgentId: TypeAlias = Union[int, str]

ComponentId module-attribute

ComponentId: TypeAlias = Union[ComponentIntId, ComponentStrId]

DEFAULT_PYDANTIC_MODEL_CONFIG module-attribute

DEFAULT_PYDANTIC_MODEL_CONFIG = ConfigDict(strict=False, arbitrary_types_allowed=True, extra='ignore', validate_by_alias=True, validate_by_name=True, coerce_numbers_to_str=True, alias_generator=camelize, populate_by_name=True)

Location module-attribute

Location: TypeAlias = Union[ICell, IRack, IRackGroup, IShapeBlueprint]

LocationType module-attribute

LocationType = Union[Point, Storable]

MetricModelType module-attribute

MetricModelType = TypeVar('MetricModelType', bound=MetricModel)

T module-attribute

T = TypeVar('T')

TmpId module-attribute

TmpId: TypeAlias = Union[int, str]

__doc__ module-attribute

__doc__ = '\n仿真模块\n'

BlueprintKind

蓝图的类型,如果不够用,可以修改此类或者继承并添加新的类型

源代码位于: logis/biz/sim/data_type/component.py
class BlueprintKind(enum.Enum):
    """
    蓝图的类型,如果不够用,可以修改此类或者继承并添加新的类型
    """

    CODE = "code"
    DUMMY = "dummy"
    SOURCE = "source"
    ENTER = "enter"
    EXIT = "exit"
    SINK = "sink"
    ASSEMBLER = "assembler"
    DELAY = "delay"
    EVENT = "event"
    GOODS_TO_PERSON = "goods_to_person"
    PRODUCER = "producer"
    WORK_STATION = "work_station"
    # 最初是资源池、货架组、后来发现二者可以统称为资源集,TODO:后续考虑完全统一
    RESOURCE_SET = "resource_set"
    RESOURCE_POOL = "resource_pool"
    TRANSPORT = "transport"

    RACK_SYSTEM = "rack_system"
    RACK = "rack"
    AGV_RACK = "agv_rack"
    PALLET_RACK = "pallet_rack"
    PICKING_RACK = "picking_rack"
    ASRS = "asrs"
    REPOSITORY = "repository"

    CONVEYOR_BELT = "conveyor_belt"
    TURNING_STATION = "turning_station"
    PATH = "path"
    GRID = "grid"
    POINT_NODE = "point_node"
    RECTANGULAR_NODE = "rectangular_node"

    AGENT = "agent"
    AGV = "agv"
    STOCK = "stock"
    FORKLIFT = "forklift"

    SHAPE = "shape"

AGENT class-attribute instance-attribute

AGENT = 'agent'

AGV class-attribute instance-attribute

AGV = 'agv'

AGV_RACK class-attribute instance-attribute

AGV_RACK = 'agv_rack'

ASRS class-attribute instance-attribute

ASRS = 'asrs'

ASSEMBLER class-attribute instance-attribute

ASSEMBLER = 'assembler'

CODE class-attribute instance-attribute

CODE = 'code'

CONVEYOR_BELT class-attribute instance-attribute

CONVEYOR_BELT = 'conveyor_belt'

DELAY class-attribute instance-attribute

DELAY = 'delay'

DUMMY class-attribute instance-attribute

DUMMY = 'dummy'

ENTER class-attribute instance-attribute

ENTER = 'enter'

EVENT class-attribute instance-attribute

EVENT = 'event'

EXIT class-attribute instance-attribute

EXIT = 'exit'

FORKLIFT class-attribute instance-attribute

FORKLIFT = 'forklift'

GOODS_TO_PERSON class-attribute instance-attribute

GOODS_TO_PERSON = 'goods_to_person'

GRID class-attribute instance-attribute

GRID = 'grid'

PALLET_RACK class-attribute instance-attribute

PALLET_RACK = 'pallet_rack'

PATH class-attribute instance-attribute

PATH = 'path'

PICKING_RACK class-attribute instance-attribute

PICKING_RACK = 'picking_rack'

POINT_NODE class-attribute instance-attribute

POINT_NODE = 'point_node'

PRODUCER class-attribute instance-attribute

PRODUCER = 'producer'

RACK class-attribute instance-attribute

RACK = 'rack'

RACK_SYSTEM class-attribute instance-attribute

RACK_SYSTEM = 'rack_system'

RECTANGULAR_NODE class-attribute instance-attribute

RECTANGULAR_NODE = 'rectangular_node'

REPOSITORY class-attribute instance-attribute

REPOSITORY = 'repository'

RESOURCE_POOL class-attribute instance-attribute

RESOURCE_POOL = 'resource_pool'

RESOURCE_SET class-attribute instance-attribute

RESOURCE_SET = 'resource_set'

SHAPE class-attribute instance-attribute

SHAPE = 'shape'

SINK class-attribute instance-attribute

SINK = 'sink'

SOURCE class-attribute instance-attribute

SOURCE = 'source'

STOCK class-attribute instance-attribute

STOCK = 'stock'

TRANSPORT class-attribute instance-attribute

TRANSPORT = 'transport'

TURNING_STATION class-attribute instance-attribute

TURNING_STATION = 'turning_station'

WORK_STATION class-attribute instance-attribute

WORK_STATION = 'work_station'

ComponentConfig

组件配置

源代码位于: logis/biz/sim/data_type/component.py
class ComponentConfig(BaseModel):
    """
    组件配置
    """

    entity_list: List[ComponentConfigItem] = Field(default_factory=list)

entity_list class-attribute instance-attribute

entity_list: List[ComponentConfigItem] = Field(default_factory=list)

ComponentConfigItem

组件配置项,用于前端展示和后端处理

源代码位于: logis/biz/sim/data_type/component.py
class ComponentConfigItem(BaseModel):
    """
    组件配置项,用于前端展示和后端处理
    """

    # 中文名
    label: Optional[str] = None
    # 描述
    description: Optional[str] = None
    # 类型编码
    type_id: str
    # 加载器,用于指导后端解析组件
    loader: Optional[str] = None
    # 是否卸载,用于卸载已加载的组件
    unload: bool = False

    # 是否启用配置
    enabled: bool = True

description class-attribute instance-attribute

description: Optional[str] = None

enabled class-attribute instance-attribute

enabled: bool = True

label class-attribute instance-attribute

label: Optional[str] = None

loader class-attribute instance-attribute

loader: Optional[str] = None

type_id instance-attribute

type_id: str

unload class-attribute instance-attribute

unload: bool = False

ComponentLoader

组件加载器、解析器

源代码位于: logis/biz/sim/iface/component.py
@runtime_checkable
class ComponentLoader(Protocol):
    """
    组件加载器、解析器
    """

    def load(self, item: ComponentConfigItem) -> Type[IComponent]:
        """
        加载组件

        Args:
            item (ComponentConfigItem): 组件配置项

        Returns:
            Type[IComponent]: 组件类
        """
        raise NotImplementedError("load method not implemented")

load

load(item: ComponentConfigItem) -> Type[IComponent]

加载组件

参数:

名称 类型 描述 默认
item ComponentConfigItem

组件配置项

必需

返回:

类型 描述
Type[IComponent]

Type[IComponent]: 组件类

源代码位于: logis/biz/sim/iface/component.py
def load(self, item: ComponentConfigItem) -> Type[IComponent]:
    """
    加载组件

    Args:
        item (ComponentConfigItem): 组件配置项

    Returns:
        Type[IComponent]: 组件类
    """
    raise NotImplementedError("load method not implemented")

IBlueprintRegistry

蓝图注册中心

源代码位于: logis/biz/sim/iface/registry.py
6
7
8
9
class IBlueprintRegistry(IRegistry[IBlueprint]):
    """
    蓝图注册中心
    """

IComponent

类型标识 历史逻辑区分蓝图和空间标记两种概念,为了统一衍生出此接口 但后续在改造的过程中,逐步把空间标记也视为蓝图,因此此接口似乎就没太多必要,不过作为更抽象的接口,还是保留着

源代码位于: logis/biz/sim/iface/component.py
class IComponent(ABC):
    """
    类型标识
    历史逻辑区分蓝图和空间标记两种概念,为了统一衍生出此接口
    但后续在改造的过程中,逐步把空间标记也视为蓝图,因此此接口似乎就没太多必要,不过作为更抽象的接口,还是保留着
    """

    def __init__(self, *args, **kwargs):
        pass

    def is_valid(self):
        """
        判断组件是否有效
        """
        return True

    def previous_nodes(self, direct: bool = True, **kwargs) -> Iterable[Self]:
        """
        获取上游节点

        Args:
            direct (bool, optional): 是否只获取直接上游节点。默认值为True。
        """
        raise NotImplementedError("previous_nodes method not implemented")

    def next_nodes(self, direct: bool = True, **kwargs) -> Iterable[Self]:
        """
        获取下游节点

        Args:
            direct (bool, optional): 是否只获取直接下游节点。默认值为True。
        """
        raise NotImplementedError("next_nodes method not implemented")

__init__

__init__(*args, **kwargs)
源代码位于: logis/biz/sim/iface/component.py
def __init__(self, *args, **kwargs):
    pass

is_valid

is_valid()

判断组件是否有效

源代码位于: logis/biz/sim/iface/component.py
def is_valid(self):
    """
    判断组件是否有效
    """
    return True

next_nodes

next_nodes(direct: bool = True, **kwargs) -> Iterable[Self]

获取下游节点

参数:

名称 类型 描述 默认
direct bool

是否只获取直接下游节点。默认值为True。

True
源代码位于: logis/biz/sim/iface/component.py
def next_nodes(self, direct: bool = True, **kwargs) -> Iterable[Self]:
    """
    获取下游节点

    Args:
        direct (bool, optional): 是否只获取直接下游节点。默认值为True。
    """
    raise NotImplementedError("next_nodes method not implemented")

previous_nodes

previous_nodes(direct: bool = True, **kwargs) -> Iterable[Self]

获取上游节点

参数:

名称 类型 描述 默认
direct bool

是否只获取直接上游节点。默认值为True。

True
源代码位于: logis/biz/sim/iface/component.py
def previous_nodes(self, direct: bool = True, **kwargs) -> Iterable[Self]:
    """
    获取上游节点

    Args:
        direct (bool, optional): 是否只获取直接上游节点。默认值为True。
    """
    raise NotImplementedError("previous_nodes method not implemented")

IDataReport

源代码位于: logis/biz/sim/iface/base.py
class IDataReport(ABC):
    pass

IExcelParser

源代码位于: logis/biz/sim/iface/base.py
class IExcelParser(ABC):
    pass

IExpose

用于通过SDK向外暴露参数

源代码位于: logis/biz/sim/iface/base.py
class IExpose(ABC):
    """
    用于通过SDK向外暴露参数
    """

    def __init__(self, ctx: "Context", **kwargs) -> None:
        self.ctx = ctx

ctx instance-attribute

ctx = ctx

__init__

__init__(ctx: Context, **kwargs) -> None
源代码位于: logis/biz/sim/iface/base.py
def __init__(self, ctx: "Context", **kwargs) -> None:
    self.ctx = ctx

IJsonParser

源代码位于: logis/biz/sim/iface/base.py
class IJsonParser(ABC):
    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.logic_graph: Optional[DiGraph] = None
        self._produce_recipe_graph: Optional[DiGraph] = None
        # TODO:考虑使用id_instance_map来代替
        self.object_map: Optional[Dict[str, IBlueprint]] = None

    @property
    def id_instance_map(self) -> Dict[str, "IBlueprint"]:
        assert self.object_map, "请先解析JSON文件"
        return self.object_map

id_instance_map property

id_instance_map: Dict[str, IBlueprint]

logic_graph instance-attribute

logic_graph: Optional[DiGraph] = None

object_map instance-attribute

object_map: Optional[Dict[str, IBlueprint]] = None

__init__

__init__(*args, **kwargs) -> None
源代码位于: logis/biz/sim/iface/base.py
def __init__(self, *args, **kwargs) -> None:
    super().__init__(*args, **kwargs)
    self.logic_graph: Optional[DiGraph] = None
    self._produce_recipe_graph: Optional[DiGraph] = None
    # TODO:考虑使用id_instance_map来代替
    self.object_map: Optional[Dict[str, IBlueprint]] = None

ILocationGetter

位置获取器接口

源代码位于: logis/biz/sim/__init__.py
class ILocationGetter(IExpose):
    """
    位置获取器接口
    """

    @abstractmethod
    def get(self, **kwargs) -> Optional[Location]:
        """
        获取位置
        """
        pass

get

get(**kwargs) -> Optional[Location]

获取位置

源代码位于: logis/biz/sim/__init__.py
@abstractmethod
def get(self, **kwargs) -> Optional[Location]:
    """
    获取位置
    """
    pass

IOrder

源代码位于: logis/biz/sim/iface/order.py
class IOrder(ABC):

    @abstractmethod
    def get_priority(self) -> int:
        """
        订单优先级,数字越小优先级越高
        """
        pass

    def get_wave_id(self) -> Union[str, int, None]:
        """
        波次ID
        """
        return None

    @abstractmethod
    def set_wave_id(self, wave_id: Union[str, int, None]):
        """
        设置波次ID
        """
        pass

    @abstractmethod
    def get_order_time(self) -> datetime:
        """
        订单时间
        """
        pass

get_order_time abstractmethod

get_order_time() -> datetime

订单时间

源代码位于: logis/biz/sim/iface/order.py
@abstractmethod
def get_order_time(self) -> datetime:
    """
    订单时间
    """
    pass

get_priority abstractmethod

get_priority() -> int

订单优先级,数字越小优先级越高

源代码位于: logis/biz/sim/iface/order.py
@abstractmethod
def get_priority(self) -> int:
    """
    订单优先级,数字越小优先级越高
    """
    pass

get_wave_id

get_wave_id() -> Union[str, int, None]

波次ID

源代码位于: logis/biz/sim/iface/order.py
def get_wave_id(self) -> Union[str, int, None]:
    """
    波次ID
    """
    return None

set_wave_id abstractmethod

set_wave_id(wave_id: Union[str, int, None])

设置波次ID

源代码位于: logis/biz/sim/iface/order.py
@abstractmethod
def set_wave_id(self, wave_id: Union[str, int, None]):
    """
    设置波次ID
    """
    pass

IOrderManager

源代码位于: logis/biz/sim/iface/base.py
class IOrderManager(ABC):
    pass

IResourceManager

源代码位于: logis/biz/sim/iface/base.py
class IResourceManager(ABC):
    pass

IResultGenerator

源代码位于: logis/biz/sim/iface/base.py
class IResultGenerator(ABC):
    pass

ISimProxy

仿真模块接口

源代码位于: logis/biz/sim/iface/proxy.py
class ISimProxy(ABC):
    """
    仿真模块接口
    """

    @property
    @abstractmethod
    def data_report(self) -> Optional["IDataReport"]:
        pass

    @property
    @abstractmethod
    def log(self) -> logging.Logger:
        pass

    @property
    @abstractmethod
    def json_parser(self) -> Optional["IJsonParser"]:
        pass

    @property
    @abstractmethod
    def result_generator(self) -> Optional["IResultGenerator"]:
        """获取结果生成器(子类需实现)"""
        pass

    @property
    @abstractmethod
    def storage_manager(self) -> Optional["IStorageManager"]:
        """获取存储管理器(子类需实现)"""
        pass

    @property
    @abstractmethod
    def network(self) -> DiGraph:
        """获取网络模块(子类需实现)"""
        pass

    @property
    def env(self) -> simpy.Environment:
        assert self.sim_ctx is not None, "sim_ctx is None"
        return self.sim_ctx.env

    @property
    def sim_id(self) -> str:
        return self.context.sim_id()

    @property
    def context(self) -> Type["Context"]:
        from ..ctx import Context

        return Context

    @property
    @abstractmethod
    def task_graph(self) -> TaskGraph:
        """获取任务图(子类需实现)"""
        pass

    @property
    @abstractmethod
    def rack_graph(self) -> DiGraph:
        """获取机架图(子类需实现)"""
        pass

    @property
    @abstractmethod
    def sim_ctx(self) -> Optional["SimContext"]:
        """获取仿真上下文(子类需实现)"""
        pass

    @property
    def debug(self) -> bool:
        """获取调试模式标识"""
        return self.setup_args.debug

    @property
    @abstractmethod
    def setup_args(self) -> "SetupArgs":
        """获取启动参数(子类需实现)"""
        pass

    def start_timing_if_not(self, key: str):
        """
        如果还没有开始计时就从现在开始计时

        Args:
            key: 时长键
        """
        from ..ctx import Context

        return Context.start_timing_if_not(key, now=self.env.now)

    def stop_timing(self, key: str):
        """
        停止计时

        Args:
            key: 时长键
        """
        from ..ctx import Context

        return Context.stop_timing(key, now=self.env.now)

    @abstractmethod
    def collect_metric(self, metric: MetricModelType):
        """
        收集指标数据(子类需实现)

        Args:
            metric: 指标模型
        """
        pass

    @property
    def logic_graph(self) -> Optional[DiGraph]:
        """基于 json_parser 实现的逻辑图"""
        x = self.json_parser
        return x.logic_graph if x else None

    @property
    def produce_recipe_graph(self) -> Optional[DiGraph]:
        """基于 json_parser 实现的生产配方图"""
        x = self.json_parser
        return x._produce_recipe_graph if x else None

    @property
    def id_instance_map(self) -> Optional[Dict[str, Any]]:
        """基于 json_parser 实现的ID实例映射"""
        x = self.json_parser
        return x.object_map if x else None

    @property
    def is_sim_over(self):
        """
        判断仿真是否已结束,无论是否正常结束
        """
        ctx = self.sim_ctx
        assert ctx, "仿真上下文未初始化"
        return ctx.is_over()

    @property
    def is_sim_terminated(self):
        """
        判断仿真是否已终止
        """
        ctx = self.sim_ctx
        assert ctx, "仿真上下文未初始化"
        return ctx.is_terminated()

    @property
    def ws_conn(self):
        """获取WebSocket连接(基于 sim_ctx 的默认实现,子类可重写)"""
        ctx = self.sim_ctx
        return ctx.ws_conn if ctx else None

    @property
    def event_loop(self):
        ctx = self.sim_ctx
        return ctx.event_loop if ctx else None

context property

context: Type[Context]

data_report abstractmethod property

data_report: Optional[IDataReport]

debug property

debug: bool

获取调试模式标识

env property

env: Environment

event_loop property

event_loop

id_instance_map property

id_instance_map: Optional[Dict[str, Any]]

基于 json_parser 实现的ID实例映射

is_sim_over property

is_sim_over

判断仿真是否已结束,无论是否正常结束

is_sim_terminated property

is_sim_terminated

判断仿真是否已终止

json_parser abstractmethod property

json_parser: Optional[IJsonParser]

log abstractmethod property

log: Logger

logic_graph property

logic_graph: Optional[DiGraph]

基于 json_parser 实现的逻辑图

network abstractmethod property

network: DiGraph

获取网络模块(子类需实现)

produce_recipe_graph property

produce_recipe_graph: Optional[DiGraph]

基于 json_parser 实现的生产配方图

rack_graph abstractmethod property

rack_graph: DiGraph

获取机架图(子类需实现)

result_generator abstractmethod property

result_generator: Optional[IResultGenerator]

获取结果生成器(子类需实现)

setup_args abstractmethod property

setup_args: SetupArgs

获取启动参数(子类需实现)

sim_ctx abstractmethod property

sim_ctx: Optional[SimContext]

获取仿真上下文(子类需实现)

sim_id property

sim_id: str

storage_manager abstractmethod property

storage_manager: Optional[IStorageManager]

获取存储管理器(子类需实现)

task_graph abstractmethod property

task_graph: TaskGraph

获取任务图(子类需实现)

ws_conn property

ws_conn

获取WebSocket连接(基于 sim_ctx 的默认实现,子类可重写)

collect_metric abstractmethod

collect_metric(metric: MetricModelType)

收集指标数据(子类需实现)

参数:

名称 类型 描述 默认
metric MetricModelType

指标模型

必需
源代码位于: logis/biz/sim/iface/proxy.py
@abstractmethod
def collect_metric(self, metric: MetricModelType):
    """
    收集指标数据(子类需实现)

    Args:
        metric: 指标模型
    """
    pass

start_timing_if_not

start_timing_if_not(key: str)

如果还没有开始计时就从现在开始计时

参数:

名称 类型 描述 默认
key str

时长键

必需
源代码位于: logis/biz/sim/iface/proxy.py
def start_timing_if_not(self, key: str):
    """
    如果还没有开始计时就从现在开始计时

    Args:
        key: 时长键
    """
    from ..ctx import Context

    return Context.start_timing_if_not(key, now=self.env.now)

stop_timing

stop_timing(key: str)

停止计时

参数:

名称 类型 描述 默认
key str

时长键

必需
源代码位于: logis/biz/sim/iface/proxy.py
def stop_timing(self, key: str):
    """
    停止计时

    Args:
        key: 时长键
    """
    from ..ctx import Context

    return Context.stop_timing(key, now=self.env.now)

IStorageManager

源代码位于: logis/biz/sim/iface/base.py
class IStorageManager(ABC):
    pass

IUnitManager

源代码位于: logis/biz/sim/iface/base.py
class IUnitManager(ABC):

    @abstractmethod
    def find_unit_level(
        self, stock_name: str, unit_name: str
    ) -> Optional[Literal["一级单位", "二级单位", "三级单位"]]:
        pass

    @abstractmethod
    def get_unit_config(self, stock_name: str) -> UnitConfig:
        pass

find_unit_level abstractmethod

find_unit_level(stock_name: str, unit_name: str) -> Optional[Literal['一级单位', '二级单位', '三级单位']]
源代码位于: logis/biz/sim/iface/base.py
@abstractmethod
def find_unit_level(
    self, stock_name: str, unit_name: str
) -> Optional[Literal["一级单位", "二级单位", "三级单位"]]:
    pass

get_unit_config abstractmethod

get_unit_config(stock_name: str) -> UnitConfig
源代码位于: logis/biz/sim/iface/base.py
@abstractmethod
def get_unit_config(self, stock_name: str) -> UnitConfig:
    pass

OldSimEvent

旧版仿真事件结构体,包含了仿真过程中产生的事件可能用到的所有字段

源代码位于: logis/biz/sim/data_type/__init__.py
class OldSimEvent(SimMetadata):
    """
    旧版仿真事件结构体,包含了仿真过程中产生的事件可能用到的所有字段
    """

    # 下面的都是历史结构体,所有的字段融合在一起,后续逐步使用泛型代替
    entity_id: Optional[TmpId] = Field(
        validation_alias=AliasChoices("entity_id", "agent_id", "id"), default=None
    )
    # 智能体类型
    entity_type: Optional[ComponentId] = None
    # 起始位置
    start_pos: Optional[List[float]] = None
    start_point: Optional[Point] = None
    # 结束位置
    end_pos: Optional[List[float]] = None
    end_point: Optional[Point] = None
    # 时长
    duration: Optional[Union[int, float]] = None
    # FIXME:名字叫货架ID,但实际上存的全是stock_id
    payload_id: Optional[TmpId] = Field(
        validation_alias=AliasChoices("shelf_id", "payload_id"), default=None
    )
    rack_id: Optional[TmpId] = None
    # 货物载体ID
    carrier_id: Optional[TmpId] = None

    init_stock: Optional[bool] = None
    stock: Optional[Any] = None
    stock_id: Optional[TmpId] = None

    model_config = DEFAULT_PYDANTIC_MODEL_CONFIG

carrier_id class-attribute instance-attribute

carrier_id: Optional[TmpId] = None

duration class-attribute instance-attribute

duration: Optional[Union[int, float]] = None

end_point class-attribute instance-attribute

end_point: Optional[Point] = None

end_pos class-attribute instance-attribute

end_pos: Optional[List[float]] = None

entity_id class-attribute instance-attribute

entity_id: Optional[TmpId] = Field(validation_alias=AliasChoices('entity_id', 'agent_id', 'id'), default=None)

entity_type class-attribute instance-attribute

entity_type: Optional[ComponentId] = None

init_stock class-attribute instance-attribute

init_stock: Optional[bool] = None

model_config class-attribute instance-attribute

model_config = DEFAULT_PYDANTIC_MODEL_CONFIG

payload_id class-attribute instance-attribute

payload_id: Optional[TmpId] = Field(validation_alias=AliasChoices('shelf_id', 'payload_id'), default=None)

rack_id class-attribute instance-attribute

rack_id: Optional[TmpId] = None

start_point class-attribute instance-attribute

start_point: Optional[Point] = None

start_pos class-attribute instance-attribute

start_pos: Optional[List[float]] = None

stock class-attribute instance-attribute

stock: Optional[Any] = None

stock_id class-attribute instance-attribute

stock_id: Optional[TmpId] = None

OrderPickingConfig

订单拣选配置项

源代码位于: logis/biz/sim/iface/picking.py
class OrderPickingConfig(BaseModel):
    """
    订单拣选配置项
    """

    # 拣选工人能同时处理的订单数量
    picker_parallel_orders: Optional[int] = None

    # # 分波次的方式
    # wave_method: Optional[Literal["订单数量", "时间段"]] = None
    # # 所选合并订单方式的值
    # value_of_wave_method: Optional[Union[UTime, int]] = None

    # 按照订单优先级排序
    sort_by_priority: bool = False

    # 智能体专用
    merge_by_sku_when_fetching: bool = True

    model_config = DEFAULT_PYDANTIC_MODEL_CONFIG

merge_by_sku_when_fetching class-attribute instance-attribute

merge_by_sku_when_fetching: bool = True

model_config class-attribute instance-attribute

model_config = DEFAULT_PYDANTIC_MODEL_CONFIG

picker_parallel_orders class-attribute instance-attribute

picker_parallel_orders: Optional[int] = None

sort_by_priority class-attribute instance-attribute

sort_by_priority: bool = False

OrderPickingStrategy

订单拣选策略 1. 合并size均为1时,退化为按单拣选 2. 合并size均>1时,即是合并拣选 3. 合并size>=1时,即是混合拣选

源代码位于: logis/biz/sim/iface/picking.py
class OrderPickingStrategy(IExpose):
    """
    订单拣选策略
    1. 合并size均为1时,退化为按单拣选
    2. 合并size均>1时,即是合并拣选
    3. 合并size>=1时,即是混合拣选
    """

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    @abstractmethod
    def on_merge(self, *orders: Tuple[IOrder], **kwargs):
        """
        当订单发生合并时的钩子方法
        """
        pass

    @abstractmethod
    def merge(
        self, orders: List[IOrder], config: Union[OrderPickingConfig, None] = None
    ) -> List[IOrder]:
        """
        订单合并逻辑

        Args:
            orders: 待合并的订单列表
            config: 订单拣选配置

        Returns:
            合并后的订单列表
        """
        if not orders:
            return []
        if config and config.sort_by_priority:
            orders = sorted(orders, key=lambda od: od.get_priority())
        return orders

__init__

__init__(**kwargs)
源代码位于: logis/biz/sim/iface/picking.py
def __init__(self, **kwargs):
    super().__init__(**kwargs)

merge abstractmethod

merge(orders: List[IOrder], config: Union[OrderPickingConfig, None] = None) -> List[IOrder]

订单合并逻辑

参数:

名称 类型 描述 默认
orders List[IOrder]

待合并的订单列表

必需
config Union[OrderPickingConfig, None]

订单拣选配置

None

返回:

类型 描述
List[IOrder]

合并后的订单列表

源代码位于: logis/biz/sim/iface/picking.py
@abstractmethod
def merge(
    self, orders: List[IOrder], config: Union[OrderPickingConfig, None] = None
) -> List[IOrder]:
    """
    订单合并逻辑

    Args:
        orders: 待合并的订单列表
        config: 订单拣选配置

    Returns:
        合并后的订单列表
    """
    if not orders:
        return []
    if config and config.sort_by_priority:
        orders = sorted(orders, key=lambda od: od.get_priority())
    return orders

on_merge abstractmethod

on_merge(*orders: Tuple[IOrder], **kwargs)

当订单发生合并时的钩子方法

源代码位于: logis/biz/sim/iface/picking.py
@abstractmethod
def on_merge(self, *orders: Tuple[IOrder], **kwargs):
    """
    当订单发生合并时的钩子方法
    """
    pass

Point

此类不通用,历史遗留,不建议使用,如有需要建议使用GenericPoint 支持x,y,z三维坐标点,也可当作二维坐标使用 TODO: 处理单位

源代码位于: logis/data_type/point.py
class Point(GenericPoint[float]):
    """
    此类不通用,历史遗留,不建议使用,如有需要建议使用GenericPoint
    支持x,y,z三维坐标点,也可当作二维坐标使用
    TODO: 处理单位
    """

    __counter = count(0, step=1)
    __global_precision__ = 3

    @classmethod
    def try_parse(cls, dc: dict, **kwargs) -> "Point":
        """
        尝试读取X、Y、Z坐标,返回一个Point对象
        """
        if not isinstance(dc, dict):
            raise ValueError("Expected a dictionary for Point parsing.")
        xyz = [dc.get(k, None) for k in ("X", "Y", "Z")]
        return Point(xyz, **kwargs)

    @classmethod
    def from_tuple(cls, args: Tuple[Number], **kwargs):
        """
        从元组创建点
        """
        return cls(args, **kwargs)

    @classmethod
    def of(
        cls,
        x: Optional[Number] = None,
        y: Optional[Number] = None,
        z: Optional[Number] = None,
        **kwargs,
    ) -> "Point":
        p = Point(x, y, z, **kwargs)
        return p

    def __init__(self, *args, precision: Optional[int] = None, **kwargs):
        """
        支持[x,y]、x,y、[x,y,z]、x,y,z传参赋值
        默认x=y=z=0

        TODO: 不应该自动转float、不应该自动保留两位小数
        """
        self._id = next(self.__counter)
        self.unit: Optional[str] = kwargs.get("unit", None)

        x: Optional[Number] = kwargs.get("x", None)
        y: Optional[Number] = kwargs.get("y", None)
        z: Optional[Number] = kwargs.get("z", None)
        if len(args) == 1:
            assert isinstance(
                args[0], (list, tuple)
            ), "single argument must be a list or tuple."
            x = cast_if_not_none(args[0][0], float)
            y = cast_if_not_none(args[0][1], float)
            if len(args[0]) > 2 and (z := args[0][2]) is not None:
                z = cast_if_not_none(z, float)

        elif len(args) >= 2:
            x = cast_if_not_none(args[0], float)
            y = cast_if_not_none(args[1], float)
            if len(args) > 2 and (z := args[2]) is not None:
                z = cast_if_not_none(z, float)

        # 调试过程中发现z有时候是None有时候是0,因此这里统一给了默认值
        # 实际上应该外层规范
        x = x if x is not None else 0
        y = y if y is not None else 0
        z = z if z is not None else 0

        kwargs.update(x=x, y=y, z=z)

        super().__init__(precision=precision, **kwargs)

    def __repr__(self):
        return f"Point({self.x}, {self.y}, {self.z})"

    def __hash__(self):

        def s(v: Optional[Number]):
            if not self.precision:
                return v
            if v is None:
                return v
            return v
            # return str(Decimal(v).quantize(Decimal(f"0.{'0'*self.precision}")))

        # if self._id is not None:
        #     return hash((self.__class__.__name__, self._id))
        return hash((s(self.x), s(self.y), s(self.z), self.precision, self.unit))

    def __lt__(self, other):
        """
        FIXME: 考虑z
        """
        if isinstance(other, Point):
            # TODO: 只要任何一个大于就行?
            return (self.x, self.y) < (other.x, other.y)
        return NotImplemented

    def __sub__(self, other: "Point"):
        def sub(a: Number, b: Number):
            if a is None and b is None:
                return None
            a = a or 0
            b = b or 0
            return a - b

        assert self.unit == other.unit, "unit must be same"
        return Point.of(
            x=sub(self.x, other.x),
            y=sub(self.y, other.y),
            z=sub(self.z, other.z),
            unit=self.unit,
        )

    def to_tuple(self) -> TuplePoint:
        return (
            self.x,
            self.y,
            self.z,
        )

    def __str__(self):
        return f"Point(id={self._id},x={self.x},y={self.y},z={self.z})"

__counter class-attribute instance-attribute

__counter = count(0, step=1)

__global_precision__ class-attribute instance-attribute

__global_precision__ = 3

unit instance-attribute

unit: Optional[str] = get('unit', None)

__hash__

__hash__()
源代码位于: logis/data_type/point.py
def __hash__(self):

    def s(v: Optional[Number]):
        if not self.precision:
            return v
        if v is None:
            return v
        return v
        # return str(Decimal(v).quantize(Decimal(f"0.{'0'*self.precision}")))

    # if self._id is not None:
    #     return hash((self.__class__.__name__, self._id))
    return hash((s(self.x), s(self.y), s(self.z), self.precision, self.unit))

__init__

__init__(*args, precision: Optional[int] = None, **kwargs)

支持[x,y]、x,y、[x,y,z]、x,y,z传参赋值 默认x=y=z=0

TODO: 不应该自动转float、不应该自动保留两位小数

源代码位于: logis/data_type/point.py
def __init__(self, *args, precision: Optional[int] = None, **kwargs):
    """
    支持[x,y]、x,y、[x,y,z]、x,y,z传参赋值
    默认x=y=z=0

    TODO: 不应该自动转float、不应该自动保留两位小数
    """
    self._id = next(self.__counter)
    self.unit: Optional[str] = kwargs.get("unit", None)

    x: Optional[Number] = kwargs.get("x", None)
    y: Optional[Number] = kwargs.get("y", None)
    z: Optional[Number] = kwargs.get("z", None)
    if len(args) == 1:
        assert isinstance(
            args[0], (list, tuple)
        ), "single argument must be a list or tuple."
        x = cast_if_not_none(args[0][0], float)
        y = cast_if_not_none(args[0][1], float)
        if len(args[0]) > 2 and (z := args[0][2]) is not None:
            z = cast_if_not_none(z, float)

    elif len(args) >= 2:
        x = cast_if_not_none(args[0], float)
        y = cast_if_not_none(args[1], float)
        if len(args) > 2 and (z := args[2]) is not None:
            z = cast_if_not_none(z, float)

    # 调试过程中发现z有时候是None有时候是0,因此这里统一给了默认值
    # 实际上应该外层规范
    x = x if x is not None else 0
    y = y if y is not None else 0
    z = z if z is not None else 0

    kwargs.update(x=x, y=y, z=z)

    super().__init__(precision=precision, **kwargs)

__lt__

__lt__(other)

FIXME: 考虑z

源代码位于: logis/data_type/point.py
def __lt__(self, other):
    """
    FIXME: 考虑z
    """
    if isinstance(other, Point):
        # TODO: 只要任何一个大于就行?
        return (self.x, self.y) < (other.x, other.y)
    return NotImplemented

__repr__

__repr__()
源代码位于: logis/data_type/point.py
def __repr__(self):
    return f"Point({self.x}, {self.y}, {self.z})"

__str__

__str__()
源代码位于: logis/data_type/point.py
def __str__(self):
    return f"Point(id={self._id},x={self.x},y={self.y},z={self.z})"

__sub__

__sub__(other: Point)
源代码位于: logis/data_type/point.py
def __sub__(self, other: "Point"):
    def sub(a: Number, b: Number):
        if a is None and b is None:
            return None
        a = a or 0
        b = b or 0
        return a - b

    assert self.unit == other.unit, "unit must be same"
    return Point.of(
        x=sub(self.x, other.x),
        y=sub(self.y, other.y),
        z=sub(self.z, other.z),
        unit=self.unit,
    )

from_tuple classmethod

from_tuple(args: Tuple[Number], **kwargs)

从元组创建点

源代码位于: logis/data_type/point.py
@classmethod
def from_tuple(cls, args: Tuple[Number], **kwargs):
    """
    从元组创建点
    """
    return cls(args, **kwargs)

of classmethod

of(x: Optional[Number] = None, y: Optional[Number] = None, z: Optional[Number] = None, **kwargs) -> Point
源代码位于: logis/data_type/point.py
@classmethod
def of(
    cls,
    x: Optional[Number] = None,
    y: Optional[Number] = None,
    z: Optional[Number] = None,
    **kwargs,
) -> "Point":
    p = Point(x, y, z, **kwargs)
    return p

to_tuple

to_tuple() -> TuplePoint
源代码位于: logis/data_type/point.py
def to_tuple(self) -> TuplePoint:
    return (
        self.x,
        self.y,
        self.z,
    )

try_parse classmethod

try_parse(dc: dict, **kwargs) -> Point

尝试读取X、Y、Z坐标,返回一个Point对象

源代码位于: logis/data_type/point.py
@classmethod
def try_parse(cls, dc: dict, **kwargs) -> "Point":
    """
    尝试读取X、Y、Z坐标,返回一个Point对象
    """
    if not isinstance(dc, dict):
        raise ValueError("Expected a dictionary for Point parsing.")
    xyz = [dc.get(k, None) for k in ("X", "Y", "Z")]
    return Point(xyz, **kwargs)

SimConfig

源代码位于: logis/biz/sim/data_type/__init__.py
class SimConfig(SimMetadata):
    model_config = DEFAULT_PYDANTIC_MODEL_CONFIG

    json_path: Optional[str] = None
    excel_path: Optional[str] = None
    result_path: Optional[str] = None

    # 实际最长运行时间,单位s,默认不限时
    max_run_time: Optional[float] = None
    # 最大模拟时间,默认不限时
    max_sim_time: Optional[float] = None

    def resolve_model_name(self) -> Optional[str]:
        from logis.biz.sim.path_util import resolve_model_name

        return resolve_model_name(self.model_path)

    def resolve_result_path(self):
        if self.result_path:
            return Path(self.result_path)
        from logis.biz.sim.path_util import resolve_result_path

        return resolve_result_path(self.model_path)

    def resolve_excel_path(self) -> Optional[Path]:
        if self.excel_path:
            return Path(self.excel_path)
        from logis.biz.sim.path_util import resolve_excel_path

        return resolve_excel_path(self.model_path)

    def resolve_json_path(self) -> Optional[Path]:
        if self.json_path:
            return Path(self.json_path)
        from logis.biz.sim.path_util import resolve_json_path

        return resolve_json_path(self.model_path)

excel_path class-attribute instance-attribute

excel_path: Optional[str] = None

json_path class-attribute instance-attribute

json_path: Optional[str] = None

max_run_time class-attribute instance-attribute

max_run_time: Optional[float] = None

max_sim_time class-attribute instance-attribute

max_sim_time: Optional[float] = None

model_config class-attribute instance-attribute

model_config = DEFAULT_PYDANTIC_MODEL_CONFIG

result_path class-attribute instance-attribute

result_path: Optional[str] = None

resolve_excel_path

resolve_excel_path() -> Optional[Path]
源代码位于: logis/biz/sim/data_type/__init__.py
def resolve_excel_path(self) -> Optional[Path]:
    if self.excel_path:
        return Path(self.excel_path)
    from logis.biz.sim.path_util import resolve_excel_path

    return resolve_excel_path(self.model_path)

resolve_json_path

resolve_json_path() -> Optional[Path]
源代码位于: logis/biz/sim/data_type/__init__.py
def resolve_json_path(self) -> Optional[Path]:
    if self.json_path:
        return Path(self.json_path)
    from logis.biz.sim.path_util import resolve_json_path

    return resolve_json_path(self.model_path)

resolve_model_name

resolve_model_name() -> Optional[str]
源代码位于: logis/biz/sim/data_type/__init__.py
def resolve_model_name(self) -> Optional[str]:
    from logis.biz.sim.path_util import resolve_model_name

    return resolve_model_name(self.model_path)

resolve_result_path

resolve_result_path()
源代码位于: logis/biz/sim/data_type/__init__.py
def resolve_result_path(self):
    if self.result_path:
        return Path(self.result_path)
    from logis.biz.sim.path_util import resolve_result_path

    return resolve_result_path(self.model_path)

SimContext

仿真上下文

源代码位于: logis/biz/sim/data_type/__init__.py
class SimContext(SimMetadata):
    """
    仿真上下文
    """

    # 正常完成的信号
    __finish_event__: Optional[simpy.Event] = None
    # 终止仿真的信号
    __terminate_event__: Optional[simpy.Event] = None
    env: simpy.Environment = Field(default_factory=simpy.Environment)
    event_emitter: EventEmitter = Field(default_factory=EventEmitter)
    sim_config: Optional[SimConfig] = None
    errors: List[str] = Field(default_factory=list)
    network: DiGraph = Field(default_factory=DiGraph)
    ws_conn: Union[ClientConnection, ServerConnection, None] = None
    event_loop: Optional[asyncio.BaseEventLoop] = None

    extra: Optional[Dict[str, Any]] = None

    model_config = DEFAULT_PYDANTIC_MODEL_CONFIG

    @property
    def terminate_event(self) -> simpy.Event:
        """
        仿真终止事件
        """
        assert self.env, "环境未初始化"
        env = self.env
        if self.__terminate_event__ is None:
            self.__terminate_event__ = env.event()
        return self.__terminate_event__

    @property
    def finish_event(self) -> simpy.Event:
        """
        仿真完成事件
        """
        assert self.env, "环境未初始化"
        env = self.env
        if self.__finish_event__ is None:
            self.__finish_event__ = env.event()
        return self.__finish_event__

    def trigger_finish_event_if_not(self):
        """
        如果仿真完成事件还没有被触发,就触发它
        """
        if not self.finish_event.triggered:
            self.finish_event.succeed()

    def trigger_terminate_event_if_not(self):
        """
        如果仿真终止事件还没有被触发,就触发它
        """
        if not self.terminate_event.triggered:
            self.terminate_event.succeed()

    def is_terminated(self):
        """
        判断仿真是否已被终止
        """
        return self.terminate_event.triggered

    def is_finished(self):
        """
        判断仿真是否已完成
        """
        return self.finish_event.triggered

    def is_over(self):
        """
        判断仿真是否结束
        """
        if self.ws_conn and (is_websocket_closed(self.ws_conn)):
            return True
        return self.is_finished() or self.is_terminated()

    def resolve_sim_id(self) -> Optional[str]:
        """
        优先读取sim_config中的id,其次是本结构体中的id
        """
        return (self.sim_config.sim_id if self.sim_config else None) or self.sim_id

    def resolve_model_path(self) -> Optional[Path]:
        """
        优先读取sim_config中的model_path,其次是本结构体中的model_path
        """
        return (
            self.sim_config.model_path if self.sim_config else None
        ) or self.model_path

    def resolve_model_name(self) -> Optional[str]:
        path = self.resolve_model_path()
        return path.name if path else None

__finish_event__ class-attribute instance-attribute

__finish_event__: Optional[Event] = None

__terminate_event__ class-attribute instance-attribute

__terminate_event__: Optional[Event] = None

env class-attribute instance-attribute

env: Environment = Field(default_factory=Environment)

errors class-attribute instance-attribute

errors: List[str] = Field(default_factory=list)

event_emitter class-attribute instance-attribute

event_emitter: EventEmitter = Field(default_factory=EventEmitter)

event_loop class-attribute instance-attribute

event_loop: Optional[BaseEventLoop] = None

extra class-attribute instance-attribute

extra: Optional[Dict[str, Any]] = None

finish_event property

finish_event: Event

仿真完成事件

model_config class-attribute instance-attribute

model_config = DEFAULT_PYDANTIC_MODEL_CONFIG

network class-attribute instance-attribute

network: DiGraph = Field(default_factory=DiGraph)

sim_config class-attribute instance-attribute

sim_config: Optional[SimConfig] = None

terminate_event property

terminate_event: Event

仿真终止事件

ws_conn class-attribute instance-attribute

ws_conn: Union[ClientConnection, ServerConnection, None] = None

is_finished

is_finished()

判断仿真是否已完成

源代码位于: logis/biz/sim/data_type/__init__.py
def is_finished(self):
    """
    判断仿真是否已完成
    """
    return self.finish_event.triggered

is_over

is_over()

判断仿真是否结束

源代码位于: logis/biz/sim/data_type/__init__.py
def is_over(self):
    """
    判断仿真是否结束
    """
    if self.ws_conn and (is_websocket_closed(self.ws_conn)):
        return True
    return self.is_finished() or self.is_terminated()

is_terminated

is_terminated()

判断仿真是否已被终止

源代码位于: logis/biz/sim/data_type/__init__.py
def is_terminated(self):
    """
    判断仿真是否已被终止
    """
    return self.terminate_event.triggered

resolve_model_name

resolve_model_name() -> Optional[str]
源代码位于: logis/biz/sim/data_type/__init__.py
def resolve_model_name(self) -> Optional[str]:
    path = self.resolve_model_path()
    return path.name if path else None

resolve_model_path

resolve_model_path() -> Optional[Path]

优先读取sim_config中的model_path,其次是本结构体中的model_path

源代码位于: logis/biz/sim/data_type/__init__.py
def resolve_model_path(self) -> Optional[Path]:
    """
    优先读取sim_config中的model_path,其次是本结构体中的model_path
    """
    return (
        self.sim_config.model_path if self.sim_config else None
    ) or self.model_path

resolve_sim_id

resolve_sim_id() -> Optional[str]

优先读取sim_config中的id,其次是本结构体中的id

源代码位于: logis/biz/sim/data_type/__init__.py
def resolve_sim_id(self) -> Optional[str]:
    """
    优先读取sim_config中的id,其次是本结构体中的id
    """
    return (self.sim_config.sim_id if self.sim_config else None) or self.sim_id

trigger_finish_event_if_not

trigger_finish_event_if_not()

如果仿真完成事件还没有被触发,就触发它

源代码位于: logis/biz/sim/data_type/__init__.py
def trigger_finish_event_if_not(self):
    """
    如果仿真完成事件还没有被触发,就触发它
    """
    if not self.finish_event.triggered:
        self.finish_event.succeed()

trigger_terminate_event_if_not

trigger_terminate_event_if_not()

如果仿真终止事件还没有被触发,就触发它

源代码位于: logis/biz/sim/data_type/__init__.py
def trigger_terminate_event_if_not(self):
    """
    如果仿真终止事件还没有被触发,就触发它
    """
    if not self.terminate_event.triggered:
        self.terminate_event.succeed()

SimEvent

仿真过程中产生的事件

此结构体起初是个大杂烩,包含了仿真过程中产生的事件可能用到的所有字段,后来逐步改造成泛型

TODO: 这里只所以还继承OldSimEvent,是因为历史代码里有很多地方已经用了SimEvent\ 实际上是OldSimEvent,后续会逐步替换掉并去除这里的继承

源代码位于: logis/biz/sim/data_type/__init__.py
class SimEvent(OldSimEvent, Generic[T]):
    """
    仿真过程中产生的事件

    此结构体起初是个大杂烩,包含了仿真过程中产生的事件可能用到的所有字段,后来逐步改造成泛型

    TODO: 这里只所以还继承OldSimEvent,是因为历史代码里有很多地方已经用了SimEvent\\
    实际上是OldSimEvent,后续会逐步替换掉并去除这里的继承
    """

    source_id: Optional[TmpId] = Field(
        validation_alias=AliasChoices("source_id", "src"), default=None
    )

    # 仿真事件发生的时间
    time: Optional[Union[int, float]] = Field(default=None)
    # 事件类型
    event_type: Optional[str] = Field(
        validation_alias=AliasChoices("eventType", "event_type", "type"), default=None
    )
    # 事件数据
    data: Optional[T] = None
    extra: Optional[Dict[str, Any]] = None

    model_config = DEFAULT_PYDANTIC_MODEL_CONFIG
    # 真实时间戳
    timestamp: Optional[float] = None

    def to_config(self):
        return SimConfig(
            sim_id=self.sim_id,
            model_path=self.model_path,
            mode=self.mode,
            send_step_info_realtime=self.send_step_info_realtime,
        )

data class-attribute instance-attribute

data: Optional[T] = None

event_type class-attribute instance-attribute

event_type: Optional[str] = Field(validation_alias=AliasChoices('eventType', 'event_type', 'type'), default=None)

extra class-attribute instance-attribute

extra: Optional[Dict[str, Any]] = None

model_config class-attribute instance-attribute

model_config = DEFAULT_PYDANTIC_MODEL_CONFIG

source_id class-attribute instance-attribute

source_id: Optional[TmpId] = Field(validation_alias=AliasChoices('source_id', 'src'), default=None)

time class-attribute instance-attribute

time: Optional[Union[int, float]] = Field(default=None)

timestamp class-attribute instance-attribute

timestamp: Optional[float] = None

to_config

to_config()
源代码位于: logis/biz/sim/data_type/__init__.py
def to_config(self):
    return SimConfig(
        sim_id=self.sim_id,
        model_path=self.model_path,
        mode=self.mode,
        send_step_info_realtime=self.send_step_info_realtime,
    )

SimMetadata

仿真元数据

源代码位于: logis/biz/sim/data_type/__init__.py
class SimMetadata(BaseModel):
    """
    仿真元数据
    """

    sim_id: Optional[str] = Field(
        validation_alias=AliasChoices("sim_id", "simId"),
        default=None,
    )
    model_path: Optional[Path] = Field(
        validation_alias=AliasChoices("model_path", "modelPath"), default=None
    )
    # 历史逻辑仅支持单个仿真的同步模式,新逻辑新增完全异步模式
    mode: Optional[Literal["sync", "async"]] = None

    user_id: Optional[str] = Field(
        validation_alias=AliasChoices("user_id", "user"), default=None
    )

    client_version: Optional[str] = Field(
        validation_alias=AliasChoices("client_version", "version"), default=None
    )

    server_version: Optional[str] = Field(
        validation_alias=AliasChoices("server_version", "serverVersion"), default=None
    )

    send_step_info_realtime: Optional[bool] = Field(
        None,
        validation_alias=AliasChoices(
            "send_step_info_realtime", "sendStepInfoRealtime"
        ),
    )

    model_config = DEFAULT_PYDANTIC_MODEL_CONFIG

client_version class-attribute instance-attribute

client_version: Optional[str] = Field(validation_alias=AliasChoices('client_version', 'version'), default=None)

mode class-attribute instance-attribute

mode: Optional[Literal[sync, 'async']] = None

model_config class-attribute instance-attribute

model_config = DEFAULT_PYDANTIC_MODEL_CONFIG

model_path class-attribute instance-attribute

model_path: Optional[Path] = Field(validation_alias=AliasChoices('model_path', 'modelPath'), default=None)

send_step_info_realtime class-attribute instance-attribute

send_step_info_realtime: Optional[bool] = Field(None, validation_alias=AliasChoices('send_step_info_realtime', 'sendStepInfoRealtime'))

server_version class-attribute instance-attribute

server_version: Optional[str] = Field(validation_alias=AliasChoices('server_version', 'serverVersion'), default=None)

sim_id class-attribute instance-attribute

sim_id: Optional[str] = Field(validation_alias=AliasChoices('sim_id', 'simId'), default=None)

user_id class-attribute instance-attribute

user_id: Optional[str] = Field(validation_alias=AliasChoices('user_id', 'user'), default=None)

Storable

可存储的

使用鸭子类型实现,相比ABC更灵活

源代码位于: logis/iface/container.py
@runtime_checkable
class Storable(Protocol):
    # class Storable(metaclass=ABCMeta):
    """
    可存储的

    使用鸭子类型实现,相比ABC更灵活
    """

    def pre_store(self, *args, **kwargs):
        """
        预存储,一般用来实现资源检查、校验
        """
        pass

    def pre_retrieve(self, *args, **kwargs):
        """
        预取回,一般用来实现资源检查、校验
        """
        pass

    def store(self, *args, **kwargs) -> StoreResult:
        """
        真正的存储操作
        """
        pass

    def retrieve(self, *args, **kwargs) -> RetrieveResult:
        """
        真正的取回操作
        """
        pass

pre_retrieve

pre_retrieve(*args, **kwargs)

预取回,一般用来实现资源检查、校验

源代码位于: logis/iface/container.py
def pre_retrieve(self, *args, **kwargs):
    """
    预取回,一般用来实现资源检查、校验
    """
    pass

pre_store

pre_store(*args, **kwargs)

预存储,一般用来实现资源检查、校验

源代码位于: logis/iface/container.py
def pre_store(self, *args, **kwargs):
    """
    预存储,一般用来实现资源检查、校验
    """
    pass

retrieve

retrieve(*args, **kwargs) -> RetrieveResult

真正的取回操作

源代码位于: logis/iface/container.py
def retrieve(self, *args, **kwargs) -> RetrieveResult:
    """
    真正的取回操作
    """
    pass

store

store(*args, **kwargs) -> StoreResult

真正的存储操作

源代码位于: logis/iface/container.py
def store(self, *args, **kwargs) -> StoreResult:
    """
    真正的存储操作
    """
    pass

TaskGraph

以任务id作为节点id组成的任务树,内部数据结构是一个有向无环图

源代码位于: logis/task/manager.py
class TaskGraph(AbstractTaskManager):
    """
    以任务id作为节点id组成的任务树,内部数据结构是一个有向无环图
    """

    __KEY_TASK__ = "task"

    def __init__(self, **attr):
        self.__graph__ = DiGraph()
        self.__id_task_map__ = defaultdict()

    def parse_task_id(self, task: TaskLike) -> TaskId:
        if isinstance(task, (str, int)):
            return task
        if isinstance(task, ITask):
            return task.get_task_id()
        raise ValueError(f"Unknown task type {type(task)}")

    def task_size(self) -> int:
        """
        获取任务数量
        """
        return self.__graph__.number_of_nodes()

    def tasks(self, only_id: bool = False) -> Iterator[ITask]:
        for node_id in self.__graph__.nodes:
            yield node_id if only_id else self.get_task(node_id)

    def get_task(self, task_id: TaskId) -> Optional[ITask]:
        task = self.__id_task_map__.get(task_id)
        if task:
            return task
        node = self.__graph__.nodes.get(task_id)
        return node.get(self.__KEY_TASK__) if node else None

    def find_by(self):
        raise NotImplementedError("TaskGraph.find_by not implemented")

    def add_task(self, task: TaskLike, parent: Optional[TaskLike] = None, **attr):
        task_id = self.parse_task_id(task)
        self.__id_task_map__[task_id] = task
        self.__graph__.add_node(task_id, **attr)
        if parent:
            parent_id = self.parse_task_id(parent)
            self.add_task_if_absent(parent)
            assert (
                parent_id != task_id
            ), f"parent task {parent_id} and task itself {task_id} should be different"
            self.__graph__.add_edge(parent_id, task_id)

    def add_task_if_absent(
        self, task: TaskLike, parent: Optional[TaskLike] = None, **attr
    ) -> bool:
        task_id = self.parse_task_id(task)
        has = self.__graph__.has_node(task_id)
        if not has:
            self.add_task(task, parent=parent, **attr)
        return has

    def remove_task(self, task: TaskLike, cascade: bool = False, **kwargs):
        task_id = self.parse_task_id(task)
        if cascade:
            for child_id in list(self.get_children_id(task, strict=True)):
                self.remove_task(child_id, cascade=cascade, **kwargs)
        self.__graph__.remove_node(task_id)

    def remove_task_if_present(
        self, task: TaskLike, cascade: bool = False, **kwargs
    ) -> bool:
        task_id = self.parse_task_id(task)
        has = self.__graph__.has_node(task_id)
        if has:
            self.remove_task(task, cascade=cascade, **kwargs)
        return has

    def get_children_id(self, task: TaskLike, strict: bool = True) -> Iterator[TaskId]:
        if strict:
            return self.__graph__.successors(self.parse_task_id(task))
        raise NotImplementedError()

    def get_parents_id(self, task: TaskLike, strict: bool = True) -> Iterable[TaskId]:
        if strict:
            return self.__graph__.predecessors(self.parse_task_id(task))
        raise NotImplementedError()

    def has_parent_child_relationship(
        self, a: TaskLike, b: TaskLike, strict: bool = True
    ) -> bool:
        a_id = self.parse_task_id(a)
        b_id = self.parse_task_id(b)
        return (
            self.__graph__.has_edge(a_id, b_id)
            if strict
            else has_path(self.__graph__, a_id, b_id)
        )

    def __contains__(self, task: TaskLike) -> bool:
        """
        检查任务是否存在
        """
        return self.__graph__.has_node(self.parse_task_id(task))

    def update_task_status(self, task: TaskId, status: TaskStatus):
        """
        将任务状态设置为指定值
        """
        task_id = self.parse_task_id(task)
        task: ITask = self.get_task(task_id)
        assert task, f"任务 {task_id} 不存在"
        task.update_status(status)

    def is_status_at(self, task, status, infer=True, update=True):
        task_id = self.parse_task_id(task)
        task: ITask = self.get_task(task_id)

        match = task.is_status_at(status)
        if match is True:
            return match
        if not infer:
            return match

        children_ids = list(self.get_children_id(task_id, strict=True))
        all_children_match = (
            False
            if not children_ids
            else all(
                self.is_status_at(child_id, status, infer=infer, update=update)
                for child_id in children_ids
            )
        )
        if all_children_match and update:
            self.update_task_status(task_id, status)
        return all_children_match

    def is_task_finished(self, task: TaskLike, update: bool = True, infer=True):
        return self.is_status_at(task, TaskStatus.FINISHED, infer=infer, update=update)

__KEY_TASK__ class-attribute instance-attribute

__KEY_TASK__ = 'task'

__graph__ instance-attribute

__graph__ = DiGraph()

__id_task_map__ instance-attribute

__id_task_map__ = defaultdict()

__contains__

__contains__(task: TaskLike) -> bool

检查任务是否存在

源代码位于: logis/task/manager.py
def __contains__(self, task: TaskLike) -> bool:
    """
    检查任务是否存在
    """
    return self.__graph__.has_node(self.parse_task_id(task))

__init__

__init__(**attr)
源代码位于: logis/task/manager.py
def __init__(self, **attr):
    self.__graph__ = DiGraph()
    self.__id_task_map__ = defaultdict()

add_task

add_task(task: TaskLike, parent: Optional[TaskLike] = None, **attr)
源代码位于: logis/task/manager.py
def add_task(self, task: TaskLike, parent: Optional[TaskLike] = None, **attr):
    task_id = self.parse_task_id(task)
    self.__id_task_map__[task_id] = task
    self.__graph__.add_node(task_id, **attr)
    if parent:
        parent_id = self.parse_task_id(parent)
        self.add_task_if_absent(parent)
        assert (
            parent_id != task_id
        ), f"parent task {parent_id} and task itself {task_id} should be different"
        self.__graph__.add_edge(parent_id, task_id)

add_task_if_absent

add_task_if_absent(task: TaskLike, parent: Optional[TaskLike] = None, **attr) -> bool
源代码位于: logis/task/manager.py
def add_task_if_absent(
    self, task: TaskLike, parent: Optional[TaskLike] = None, **attr
) -> bool:
    task_id = self.parse_task_id(task)
    has = self.__graph__.has_node(task_id)
    if not has:
        self.add_task(task, parent=parent, **attr)
    return has

find_by

find_by()
源代码位于: logis/task/manager.py
def find_by(self):
    raise NotImplementedError("TaskGraph.find_by not implemented")

get_children_id

get_children_id(task: TaskLike, strict: bool = True) -> Iterator[TaskId]
源代码位于: logis/task/manager.py
def get_children_id(self, task: TaskLike, strict: bool = True) -> Iterator[TaskId]:
    if strict:
        return self.__graph__.successors(self.parse_task_id(task))
    raise NotImplementedError()

get_parents_id

get_parents_id(task: TaskLike, strict: bool = True) -> Iterable[TaskId]
源代码位于: logis/task/manager.py
def get_parents_id(self, task: TaskLike, strict: bool = True) -> Iterable[TaskId]:
    if strict:
        return self.__graph__.predecessors(self.parse_task_id(task))
    raise NotImplementedError()

get_task

get_task(task_id: TaskId) -> Optional[ITask]
源代码位于: logis/task/manager.py
def get_task(self, task_id: TaskId) -> Optional[ITask]:
    task = self.__id_task_map__.get(task_id)
    if task:
        return task
    node = self.__graph__.nodes.get(task_id)
    return node.get(self.__KEY_TASK__) if node else None

has_parent_child_relationship

has_parent_child_relationship(a: TaskLike, b: TaskLike, strict: bool = True) -> bool
源代码位于: logis/task/manager.py
def has_parent_child_relationship(
    self, a: TaskLike, b: TaskLike, strict: bool = True
) -> bool:
    a_id = self.parse_task_id(a)
    b_id = self.parse_task_id(b)
    return (
        self.__graph__.has_edge(a_id, b_id)
        if strict
        else has_path(self.__graph__, a_id, b_id)
    )

is_status_at

is_status_at(task, status, infer=True, update=True)
源代码位于: logis/task/manager.py
def is_status_at(self, task, status, infer=True, update=True):
    task_id = self.parse_task_id(task)
    task: ITask = self.get_task(task_id)

    match = task.is_status_at(status)
    if match is True:
        return match
    if not infer:
        return match

    children_ids = list(self.get_children_id(task_id, strict=True))
    all_children_match = (
        False
        if not children_ids
        else all(
            self.is_status_at(child_id, status, infer=infer, update=update)
            for child_id in children_ids
        )
    )
    if all_children_match and update:
        self.update_task_status(task_id, status)
    return all_children_match

is_task_finished

is_task_finished(task: TaskLike, update: bool = True, infer=True)
源代码位于: logis/task/manager.py
def is_task_finished(self, task: TaskLike, update: bool = True, infer=True):
    return self.is_status_at(task, TaskStatus.FINISHED, infer=infer, update=update)

parse_task_id

parse_task_id(task: TaskLike) -> TaskId
源代码位于: logis/task/manager.py
def parse_task_id(self, task: TaskLike) -> TaskId:
    if isinstance(task, (str, int)):
        return task
    if isinstance(task, ITask):
        return task.get_task_id()
    raise ValueError(f"Unknown task type {type(task)}")

remove_task

remove_task(task: TaskLike, cascade: bool = False, **kwargs)
源代码位于: logis/task/manager.py
def remove_task(self, task: TaskLike, cascade: bool = False, **kwargs):
    task_id = self.parse_task_id(task)
    if cascade:
        for child_id in list(self.get_children_id(task, strict=True)):
            self.remove_task(child_id, cascade=cascade, **kwargs)
    self.__graph__.remove_node(task_id)

remove_task_if_present

remove_task_if_present(task: TaskLike, cascade: bool = False, **kwargs) -> bool
源代码位于: logis/task/manager.py
def remove_task_if_present(
    self, task: TaskLike, cascade: bool = False, **kwargs
) -> bool:
    task_id = self.parse_task_id(task)
    has = self.__graph__.has_node(task_id)
    if has:
        self.remove_task(task, cascade=cascade, **kwargs)
    return has

task_size

task_size() -> int

获取任务数量

源代码位于: logis/task/manager.py
def task_size(self) -> int:
    """
    获取任务数量
    """
    return self.__graph__.number_of_nodes()

tasks

tasks(only_id: bool = False) -> Iterator[ITask]
源代码位于: logis/task/manager.py
def tasks(self, only_id: bool = False) -> Iterator[ITask]:
    for node_id in self.__graph__.nodes:
        yield node_id if only_id else self.get_task(node_id)

update_task_status

update_task_status(task: TaskId, status: TaskStatus)

将任务状态设置为指定值

源代码位于: logis/task/manager.py
def update_task_status(self, task: TaskId, status: TaskStatus):
    """
    将任务状态设置为指定值
    """
    task_id = self.parse_task_id(task)
    task: ITask = self.get_task(task_id)
    assert task, f"任务 {task_id} 不存在"
    task.update_status(status)

TimeWindowWaveStrategy

基于时间窗口的波次划分策略

源代码位于: logis/biz/sim/iface/wave.py
class TimeWindowWaveStrategy(WaveGroupingStrategy):
    """
    基于时间窗口的波次划分策略
    """

    def group(self, orders, config):
        if not orders:
            return []
        orders = sorted(orders, key=lambda od: od.get_order_time())

        min_time, left = orders[0].get_order_time(), orders[-1].get_order_time()
        time_index = config.time_index or pandas.date_range(
            min_time, left, freq=config.time_interval
        )
        waves: List[List[IOrder]] = []
        count = 0
        for i, left in enumerate(time_index):
            right = time_index[i + 1] if i + 1 < len(time_index) else None
            wave = []
            for od in orders[count:]:
                if od.get_order_time() >= left and (
                    right is None or od.get_order_time() < right
                ):
                    wave.append(od)
                else:
                    break
            if wave:
                waves.append(wave)
                count += len(wave)

        self._generate_wave_id(waves)
        return waves

group

group(orders, config)
源代码位于: logis/biz/sim/iface/wave.py
def group(self, orders, config):
    if not orders:
        return []
    orders = sorted(orders, key=lambda od: od.get_order_time())

    min_time, left = orders[0].get_order_time(), orders[-1].get_order_time()
    time_index = config.time_index or pandas.date_range(
        min_time, left, freq=config.time_interval
    )
    waves: List[List[IOrder]] = []
    count = 0
    for i, left in enumerate(time_index):
        right = time_index[i + 1] if i + 1 < len(time_index) else None
        wave = []
        for od in orders[count:]:
            if od.get_order_time() >= left and (
                right is None or od.get_order_time() < right
            ):
                wave.append(od)
            else:
                break
        if wave:
            waves.append(wave)
            count += len(wave)

    self._generate_wave_id(waves)
    return waves

UnitConfig

有关单位的配置项

源代码位于: logis/data_type/unitable.py
class UnitConfig(dict):
    """
    有关单位的配置项
    """

    def get_ratio(self, src: Unit, dst: Unit, src_quantity: NumberType = 1) -> Fraction:
        """
        获取 src 到 dst 的倍率
        """
        if src == dst:
            return Fraction(src_quantity, 1)
        if src in self and dst in self:
            return Fraction(self[dst] * src_quantity, self[src])
        try:
            return Fraction(get_unit_ratio(src, dst, src_quantity=src_quantity))
        except:
            pass
        raise ValueError(f"unknown unit: {src} or {dst}")

    def get_float_ratio(self, src: Unit, dst: Unit, *args, **kwargs) -> float:
        """
        获取 src 到 dst 的倍率
        """
        return float(self.get_ratio(src, dst, *args, **kwargs))

    def get_int_ratio(self, src: Unit, dst: Unit, *args, **kwargs) -> int:
        """
        获取 src 到 dst 的倍率
        """
        return int(self.get_ratio(src, dst, *args, **kwargs))

    def alias(self, unit: Unit, *aliases: Unit):
        """
        为单位添加别名
        """
        for alias in aliases:
            self[alias] = self.get(unit)
        return self

    def __or__(self, value: Dict):

        obj = dict(value)
        obj.update(self)

        return UnitConfig(obj)

__or__

__or__(value: Dict)
源代码位于: logis/data_type/unitable.py
def __or__(self, value: Dict):

    obj = dict(value)
    obj.update(self)

    return UnitConfig(obj)

alias

alias(unit: Unit, *aliases: Unit)

为单位添加别名

源代码位于: logis/data_type/unitable.py
def alias(self, unit: Unit, *aliases: Unit):
    """
    为单位添加别名
    """
    for alias in aliases:
        self[alias] = self.get(unit)
    return self

get_float_ratio

get_float_ratio(src: Unit, dst: Unit, *args, **kwargs) -> float

获取 src 到 dst 的倍率

源代码位于: logis/data_type/unitable.py
def get_float_ratio(self, src: Unit, dst: Unit, *args, **kwargs) -> float:
    """
    获取 src 到 dst 的倍率
    """
    return float(self.get_ratio(src, dst, *args, **kwargs))

get_int_ratio

get_int_ratio(src: Unit, dst: Unit, *args, **kwargs) -> int

获取 src 到 dst 的倍率

源代码位于: logis/data_type/unitable.py
def get_int_ratio(self, src: Unit, dst: Unit, *args, **kwargs) -> int:
    """
    获取 src 到 dst 的倍率
    """
    return int(self.get_ratio(src, dst, *args, **kwargs))

get_ratio

get_ratio(src: Unit, dst: Unit, src_quantity: NumberType = 1) -> Fraction

获取 src 到 dst 的倍率

源代码位于: logis/data_type/unitable.py
def get_ratio(self, src: Unit, dst: Unit, src_quantity: NumberType = 1) -> Fraction:
    """
    获取 src 到 dst 的倍率
    """
    if src == dst:
        return Fraction(src_quantity, 1)
    if src in self and dst in self:
        return Fraction(self[dst] * src_quantity, self[src])
    try:
        return Fraction(get_unit_ratio(src, dst, src_quantity=src_quantity))
    except:
        pass
    raise ValueError(f"unknown unit: {src} or {dst}")

WaveGroupingConfig

波次划分配置

源代码位于: logis/biz/sim/iface/wave.py
class WaveGroupingConfig(BaseModel):
    """
    波次划分配置
    """

    # 批次大小
    batch_size: Optional[int] = None
    # 时间窗口
    time_interval: Optional[timedelta] = None
    # 指定各个时间段
    time_index: Optional[pandas.DatetimeIndex] = None

    model_config = DEFAULT_PYDANTIC_MODEL_CONFIG

batch_size class-attribute instance-attribute

batch_size: Optional[int] = None

model_config class-attribute instance-attribute

model_config = DEFAULT_PYDANTIC_MODEL_CONFIG

time_index class-attribute instance-attribute

time_index: Optional[DatetimeIndex] = None

time_interval class-attribute instance-attribute

time_interval: Optional[timedelta] = None

WaveGroupingStrategy

波次划分策略

源代码位于: logis/biz/sim/iface/wave.py
class WaveGroupingStrategy(IExpose):
    """
    波次划分策略
    """

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    @abstractmethod
    def group(
        self, orders: List[IOrder], config: WaveGroupingConfig
    ) -> List[List[IOrder]]:
        """
        对订单列表进行波次划分

        Args:
            orders: 订单列表
            config: 波次拣选配置

        Returns:
            波次列表
        """
        pass

    def _generate_wave_id(self, waves: List[List[IOrder]]) -> Optional[Union[str, int]]:
        """
        生成波次ID,默认是uuid4
        """
        if not waves:
            return None
        for i, wave in enumerate(waves):
            wave_id = str(uuid4())
            for od in wave:
                od.set_wave_id(wave_id)

__init__

__init__(**kwargs)
源代码位于: logis/biz/sim/iface/wave.py
def __init__(self, **kwargs):
    super().__init__(**kwargs)

group abstractmethod

group(orders: List[IOrder], config: WaveGroupingConfig) -> List[List[IOrder]]

对订单列表进行波次划分

参数:

名称 类型 描述 默认
orders List[IOrder]

订单列表

必需
config WaveGroupingConfig

波次拣选配置

必需

返回:

类型 描述
List[List[IOrder]]

波次列表

源代码位于: logis/biz/sim/iface/wave.py
@abstractmethod
def group(
    self, orders: List[IOrder], config: WaveGroupingConfig
) -> List[List[IOrder]]:
    """
    对订单列表进行波次划分

    Args:
        orders: 订单列表
        config: 波次拣选配置

    Returns:
        波次列表
    """
    pass

WaveSizeStrategy

基于波次大小的波次划分策略

源代码位于: logis/biz/sim/iface/wave.py
class WaveSizeStrategy(WaveGroupingStrategy):
    """
    基于波次大小的波次划分策略
    """

    def group(self, orders, config):
        if not orders:
            return []
        orders = sorted(orders, key=lambda od: od.get_order_time())
        waves: List[List[IOrder]] = []
        wave = []
        for od in orders:
            if len(wave) >= config.batch_size:
                waves.append(wave)
                wave = []
            wave.append(od)
        if wave:
            waves.append(wave)
        self._generate_wave_id(waves)
        return waves

group

group(orders, config)
源代码位于: logis/biz/sim/iface/wave.py
def group(self, orders, config):
    if not orders:
        return []
    orders = sorted(orders, key=lambda od: od.get_order_time())
    waves: List[List[IOrder]] = []
    wave = []
    for od in orders:
        if len(wave) >= config.batch_size:
            waves.append(wave)
            wave = []
        wave.append(od)
    if wave:
        waves.append(wave)
    self._generate_wave_id(waves)
    return waves

test

test()

测试函数

源代码位于: logis/biz/sim/__init__.py
def test():
    """
    测试函数
    """
    print("This is a test function in the sim module.")