跳转至

任务

NumberType module-attribute

NumberType: TypeAlias = Union[Decimal, int, float, Fraction]

TaskId module-attribute

TaskId = Union[str, int]

TaskLike module-attribute

TaskLike = Union[ITask, TaskId]

TaskPriority module-attribute

TaskPriority = Union[str, int]

__doc__ module-attribute

__doc__ = '\n任务模块\n'

ABCEnumMeta

适用于enum类继承但是提示metaclass冲突的情况 1. 置顶metaclass为本类即可

源代码位于: logis/data_type/base.py
class ABCEnumMeta(EnumMeta, ABCMeta):
    """
    适用于enum类继承但是提示metaclass冲突的情况
    1. 置顶metaclass为本类即可
    """

    pass

AbstractTaskManager

任务管理器基础抽象类

源代码位于: logis/task/manager.py
class AbstractTaskManager(metaclass=ABCMeta):
    """
    任务管理器基础抽象类
    """

    @abstractmethod
    def is_task_finished(self, task: TaskLike, update: bool = True, infer=True) -> bool:
        """
        检查任务是否已完成

        Args:
            task (TaskLike): 任务对象
            update (bool, optional): 是否在检测到任务已完成时自动更新任务属性
            infer (bool, optional): 是否根据子任务的状态推测本任务的状态

        Returns:
            bool: 任务是否已完成
        """
        pass

    def is_status_at(
        self,
        task: TaskLike,
        status: TaskStatus,
        infer: bool = True,
        update: bool = True,
    ) -> bool:
        """
        检查任务是否在指定状态

        Args:
            task (TaskLike): 任务对象
            status (TaskStatus): 状态
            infer (bool, optional): 是否根据子任务的状态推测本任务的状态,默认值为True
            update (bool, optional): 是否在检测到任务状态符合要求时自动更新任务属性,默认值为True

        Returns:
            bool: 任务是否在指定状态
        """
        raise NotImplementedError()

    @abstractmethod
    def task_size(self) -> int:
        """
        获取任务数量
        """
        pass

    @abstractmethod
    def tasks(self, only_id: bool = False) -> Iterator[ITask]:
        """
        获取所有任务
        """
        pass

    @abstractmethod
    def add_task(self, task: TaskLike, parent: Optional[TaskLike] = None, **attr):
        """
        添加任务

        Args:
            task (TaskLike): 任务对象
            parent (TaskLike | None): 父任务对象. Defaults to None.
        """
        pass

    @abstractmethod
    def add_task_if_absent(
        self, task: TaskLike, parent: Optional[TaskLike] = None, **attr
    ) -> bool:
        """
        如果任务不存在则添加,否则不添加

        Args:
            task (TaskLike): 任务对象
            parent (TaskLike | None): 父任务对象. Defaults to None.

        Returns:
            bool: 是否添加成功
        """
        pass

    @abstractmethod
    def remove_task(self, task: TaskLike, cascade: bool = False):
        """
        删除任务

        Args:
            task (TaskLike): 任务对象
            cascade (bool, optional): 是否级联删除子任务. Defaults to False.
        """
        pass

    @abstractmethod
    def remove_task_if_present(self, task: TaskLike, cascade: bool = False):
        """
        如果任务存在则删除

        Args:
            task (TaskLike): 任务对象
            cascade (bool, optional): 是否级联删除子任务. Defaults to False.
        """
        pass

    @abstractmethod
    def get_task(self, task: TaskLike) -> Optional[ITask]:
        """
        根据任务ID获取任务本身
        """
        pass

    @abstractmethod
    def get_children_id(self, task: TaskLike, strict: bool = True) -> Iterable[TaskId]:
        """
        获取任务的所有子任务ID

        Args:
            task (TaskLike): 任务对象
            strict (bool, optional): 是否严格模式. Defaults to True.

        Returns:
            Iterable[TaskId]: 子任务ID列表
        """
        pass

    @abstractmethod
    def get_parents_id(self, task: TaskLike, strict: bool = True) -> Iterable[TaskId]:
        """
        获取任务的所有父任务ID

        Args:
            task (TaskLike): 任务对象
            strict (bool, optional): 是否严格模式. Defaults to True.

        Returns:
            Iterable[TaskId]: 父任务ID列表
        """
        pass

    @abstractmethod
    def has_parent_child_relationship(
        self, a: TaskLike, b: TaskLike, strict: bool = True
    ) -> bool:
        """
        判断任务a是否是任务b的父任务

        Args:
            a (TaskLike): 任务A
            b (TaskLike): 任务B
            strict (bool): 是否严格判断. 如果严格判断, 则判断是否存在一条有向边从a指向b. 否则判断是否存在一条从a到b的路径.

        Returns:
            bool: 是否是子任务
        """
        pass

    @abstractmethod
    def update_task_status(self, task: TaskId, status: TaskStatus):
        """
        更新任务状态

        Args:
            task (TaskId): 任务ID
            status (TaskStatus): 任务状态
        """
        pass

add_task abstractmethod

add_task(task: TaskLike, parent: Optional[TaskLike] = None, **attr)

添加任务

参数:

名称 类型 描述 默认
task TaskLike

任务对象

必需
parent TaskLike | None

父任务对象. Defaults to None.

None
源代码位于: logis/task/manager.py
@abstractmethod
def add_task(self, task: TaskLike, parent: Optional[TaskLike] = None, **attr):
    """
    添加任务

    Args:
        task (TaskLike): 任务对象
        parent (TaskLike | None): 父任务对象. Defaults to None.
    """
    pass

add_task_if_absent abstractmethod

add_task_if_absent(task: TaskLike, parent: Optional[TaskLike] = None, **attr) -> bool

如果任务不存在则添加,否则不添加

参数:

名称 类型 描述 默认
task TaskLike

任务对象

必需
parent TaskLike | None

父任务对象. Defaults to None.

None

返回:

名称 类型 描述
bool bool

是否添加成功

源代码位于: logis/task/manager.py
@abstractmethod
def add_task_if_absent(
    self, task: TaskLike, parent: Optional[TaskLike] = None, **attr
) -> bool:
    """
    如果任务不存在则添加,否则不添加

    Args:
        task (TaskLike): 任务对象
        parent (TaskLike | None): 父任务对象. Defaults to None.

    Returns:
        bool: 是否添加成功
    """
    pass

get_children_id abstractmethod

get_children_id(task: TaskLike, strict: bool = True) -> Iterable[TaskId]

获取任务的所有子任务ID

参数:

名称 类型 描述 默认
task TaskLike

任务对象

必需
strict bool

是否严格模式. Defaults to True.

True

返回:

类型 描述
Iterable[TaskId]

Iterable[TaskId]: 子任务ID列表

源代码位于: logis/task/manager.py
@abstractmethod
def get_children_id(self, task: TaskLike, strict: bool = True) -> Iterable[TaskId]:
    """
    获取任务的所有子任务ID

    Args:
        task (TaskLike): 任务对象
        strict (bool, optional): 是否严格模式. Defaults to True.

    Returns:
        Iterable[TaskId]: 子任务ID列表
    """
    pass

get_parents_id abstractmethod

get_parents_id(task: TaskLike, strict: bool = True) -> Iterable[TaskId]

获取任务的所有父任务ID

参数:

名称 类型 描述 默认
task TaskLike

任务对象

必需
strict bool

是否严格模式. Defaults to True.

True

返回:

类型 描述
Iterable[TaskId]

Iterable[TaskId]: 父任务ID列表

源代码位于: logis/task/manager.py
@abstractmethod
def get_parents_id(self, task: TaskLike, strict: bool = True) -> Iterable[TaskId]:
    """
    获取任务的所有父任务ID

    Args:
        task (TaskLike): 任务对象
        strict (bool, optional): 是否严格模式. Defaults to True.

    Returns:
        Iterable[TaskId]: 父任务ID列表
    """
    pass

get_task abstractmethod

get_task(task: TaskLike) -> Optional[ITask]

根据任务ID获取任务本身

源代码位于: logis/task/manager.py
@abstractmethod
def get_task(self, task: TaskLike) -> Optional[ITask]:
    """
    根据任务ID获取任务本身
    """
    pass

has_parent_child_relationship abstractmethod

has_parent_child_relationship(a: TaskLike, b: TaskLike, strict: bool = True) -> bool

判断任务a是否是任务b的父任务

参数:

名称 类型 描述 默认
a TaskLike

任务A

必需
b TaskLike

任务B

必需
strict bool

是否严格判断. 如果严格判断, 则判断是否存在一条有向边从a指向b. 否则判断是否存在一条从a到b的路径.

True

返回:

名称 类型 描述
bool bool

是否是子任务

源代码位于: logis/task/manager.py
@abstractmethod
def has_parent_child_relationship(
    self, a: TaskLike, b: TaskLike, strict: bool = True
) -> bool:
    """
    判断任务a是否是任务b的父任务

    Args:
        a (TaskLike): 任务A
        b (TaskLike): 任务B
        strict (bool): 是否严格判断. 如果严格判断, 则判断是否存在一条有向边从a指向b. 否则判断是否存在一条从a到b的路径.

    Returns:
        bool: 是否是子任务
    """
    pass

is_status_at

is_status_at(task: TaskLike, status: TaskStatus, infer: bool = True, update: bool = True) -> bool

检查任务是否在指定状态

参数:

名称 类型 描述 默认
task TaskLike

任务对象

必需
status TaskStatus

状态

必需
infer bool

是否根据子任务的状态推测本任务的状态,默认值为True

True
update bool

是否在检测到任务状态符合要求时自动更新任务属性,默认值为True

True

返回:

名称 类型 描述
bool bool

任务是否在指定状态

源代码位于: logis/task/manager.py
def is_status_at(
    self,
    task: TaskLike,
    status: TaskStatus,
    infer: bool = True,
    update: bool = True,
) -> bool:
    """
    检查任务是否在指定状态

    Args:
        task (TaskLike): 任务对象
        status (TaskStatus): 状态
        infer (bool, optional): 是否根据子任务的状态推测本任务的状态,默认值为True
        update (bool, optional): 是否在检测到任务状态符合要求时自动更新任务属性,默认值为True

    Returns:
        bool: 任务是否在指定状态
    """
    raise NotImplementedError()

is_task_finished abstractmethod

is_task_finished(task: TaskLike, update: bool = True, infer=True) -> bool

检查任务是否已完成

参数:

名称 类型 描述 默认
task TaskLike

任务对象

必需
update bool

是否在检测到任务已完成时自动更新任务属性

True
infer bool

是否根据子任务的状态推测本任务的状态

True

返回:

名称 类型 描述
bool bool

任务是否已完成

源代码位于: logis/task/manager.py
@abstractmethod
def is_task_finished(self, task: TaskLike, update: bool = True, infer=True) -> bool:
    """
    检查任务是否已完成

    Args:
        task (TaskLike): 任务对象
        update (bool, optional): 是否在检测到任务已完成时自动更新任务属性
        infer (bool, optional): 是否根据子任务的状态推测本任务的状态

    Returns:
        bool: 任务是否已完成
    """
    pass

remove_task abstractmethod

remove_task(task: TaskLike, cascade: bool = False)

删除任务

参数:

名称 类型 描述 默认
task TaskLike

任务对象

必需
cascade bool

是否级联删除子任务. Defaults to False.

False
源代码位于: logis/task/manager.py
@abstractmethod
def remove_task(self, task: TaskLike, cascade: bool = False):
    """
    删除任务

    Args:
        task (TaskLike): 任务对象
        cascade (bool, optional): 是否级联删除子任务. Defaults to False.
    """
    pass

remove_task_if_present abstractmethod

remove_task_if_present(task: TaskLike, cascade: bool = False)

如果任务存在则删除

参数:

名称 类型 描述 默认
task TaskLike

任务对象

必需
cascade bool

是否级联删除子任务. Defaults to False.

False
源代码位于: logis/task/manager.py
@abstractmethod
def remove_task_if_present(self, task: TaskLike, cascade: bool = False):
    """
    如果任务存在则删除

    Args:
        task (TaskLike): 任务对象
        cascade (bool, optional): 是否级联删除子任务. Defaults to False.
    """
    pass

task_size abstractmethod

task_size() -> int

获取任务数量

源代码位于: logis/task/manager.py
@abstractmethod
def task_size(self) -> int:
    """
    获取任务数量
    """
    pass

tasks abstractmethod

tasks(only_id: bool = False) -> Iterator[ITask]

获取所有任务

源代码位于: logis/task/manager.py
@abstractmethod
def tasks(self, only_id: bool = False) -> Iterator[ITask]:
    """
    获取所有任务
    """
    pass

update_task_status abstractmethod

update_task_status(task: TaskId, status: TaskStatus)

更新任务状态

参数:

名称 类型 描述 默认
task TaskId

任务ID

必需
status TaskStatus

任务状态

必需
源代码位于: logis/task/manager.py
@abstractmethod
def update_task_status(self, task: TaskId, status: TaskStatus):
    """
    更新任务状态

    Args:
        task (TaskId): 任务ID
        status (TaskStatus): 任务状态
    """
    pass

IHandler

处理接口

源代码位于: logis/iface/handler.py
class IHandler(ABC):
    """
    处理接口
    """

    @abstractmethod
    def handle(self, *args, **kwargs):
        """
        处理数据
        """

handle abstractmethod

handle(*args, **kwargs)

处理数据

源代码位于: logis/iface/handler.py
@abstractmethod
def handle(self, *args, **kwargs):
    """
    处理数据
    """

ITask

任务基础抽象类

源代码位于: logis/task/model/__init__.py
class ITask(metaclass=ABCMeta):
    """
    任务基础抽象类
    """

    @abstractmethod
    def get_task_id(self) -> TaskId:
        pass

    @abstractmethod
    def get_priority(self) -> TaskPriority:
        """
        任务优先级
        """
        pass

    @abstractmethod
    def update_status(self, status: TaskStatus):
        """
        更新任务状态
        """
        pass

    @abstractmethod
    def is_status_at(self, status: TaskStatus) -> bool:
        """
        检查任务状态是否为指定值
        """
        pass

    @property
    def finished(self) -> bool:
        return self.is_status_at(TaskStatus.FINISHED)

finished property

finished: bool

get_priority abstractmethod

get_priority() -> TaskPriority

任务优先级

源代码位于: logis/task/model/__init__.py
@abstractmethod
def get_priority(self) -> TaskPriority:
    """
    任务优先级
    """
    pass

get_task_id abstractmethod

get_task_id() -> TaskId
源代码位于: logis/task/model/__init__.py
@abstractmethod
def get_task_id(self) -> TaskId:
    pass

is_status_at abstractmethod

is_status_at(status: TaskStatus) -> bool

检查任务状态是否为指定值

源代码位于: logis/task/model/__init__.py
@abstractmethod
def is_status_at(self, status: TaskStatus) -> bool:
    """
    检查任务状态是否为指定值
    """
    pass

update_status abstractmethod

update_status(status: TaskStatus)

更新任务状态

源代码位于: logis/task/model/__init__.py
@abstractmethod
def update_status(self, status: TaskStatus):
    """
    更新任务状态
    """
    pass

ITaskHandler

任务处理接口

源代码位于: logis/task/iface/__init__.py
class ITaskHandler(IHandler):
    """
    任务处理接口
    """

    def get_current_task_concurrency(self, *args, **kwargs):
        """
        获取当前正在处理的任务并发数
        """
        raise NotImplementedError("get_current_task_concurrency")

    def exceed_the_maximum_task_concurrency(self, *args, **kwargs):
        """
        判断是否达到了任务并发上限
        """
        raise NotImplementedError("exceed_the_maximum_task_concurrency")

    def get_working_mode(self):
        """
        获取处理模式
        """
        raise NotImplementedError("get_working_mode")

    def get_task_manifest(self):
        """
        获取任务清单
        """
        raise NotImplementedError("get_task_manifest")

    def is_task_all_done(self, **kwargs) -> bool:
        """
        任务是否全部完成
        """
        raise NotImplementedError("is_task_all_done")

    @abstractmethod
    def on_task_received(self, *tasks: ITask, **kwargs):
        """
        接收任务
        """

    @abstractmethod
    def on_task_succeeded(self, *args, **kwargs):
        """
        任务完成
        """

    def on_task_failed(self, *args, **kwargs):
        """
        任务失败
        """
        raise NotImplementedError("on_task_failed")

exceed_the_maximum_task_concurrency

exceed_the_maximum_task_concurrency(*args, **kwargs)

判断是否达到了任务并发上限

源代码位于: logis/task/iface/__init__.py
def exceed_the_maximum_task_concurrency(self, *args, **kwargs):
    """
    判断是否达到了任务并发上限
    """
    raise NotImplementedError("exceed_the_maximum_task_concurrency")

get_current_task_concurrency

get_current_task_concurrency(*args, **kwargs)

获取当前正在处理的任务并发数

源代码位于: logis/task/iface/__init__.py
def get_current_task_concurrency(self, *args, **kwargs):
    """
    获取当前正在处理的任务并发数
    """
    raise NotImplementedError("get_current_task_concurrency")

get_task_manifest

get_task_manifest()

获取任务清单

源代码位于: logis/task/iface/__init__.py
def get_task_manifest(self):
    """
    获取任务清单
    """
    raise NotImplementedError("get_task_manifest")

get_working_mode

get_working_mode()

获取处理模式

源代码位于: logis/task/iface/__init__.py
def get_working_mode(self):
    """
    获取处理模式
    """
    raise NotImplementedError("get_working_mode")

is_task_all_done

is_task_all_done(**kwargs) -> bool

任务是否全部完成

源代码位于: logis/task/iface/__init__.py
def is_task_all_done(self, **kwargs) -> bool:
    """
    任务是否全部完成
    """
    raise NotImplementedError("is_task_all_done")

on_task_failed

on_task_failed(*args, **kwargs)

任务失败

源代码位于: logis/task/iface/__init__.py
def on_task_failed(self, *args, **kwargs):
    """
    任务失败
    """
    raise NotImplementedError("on_task_failed")

on_task_received abstractmethod

on_task_received(*tasks: ITask, **kwargs)

接收任务

源代码位于: logis/task/iface/__init__.py
@abstractmethod
def on_task_received(self, *tasks: ITask, **kwargs):
    """
    接收任务
    """

on_task_succeeded abstractmethod

on_task_succeeded(*args, **kwargs)

任务完成

源代码位于: logis/task/iface/__init__.py
@abstractmethod
def on_task_succeeded(self, *args, **kwargs):
    """
    任务完成
    """

IWorkingMode

工作模式

源代码位于: logis/task/iface/__init__.py
class IWorkingMode(ABC):
    """
    工作模式
    """

    pass

NumberUnit

具有数量和单位的数值类型

源代码位于: logis/data_type/unitable.py
class NumberUnit(metaclass=ABCMeta):
    """
    具有数量和单位的数值类型
    """

    quantity: NumberType = 0
    unit: Optional[Unit] = None

    # 自定义倍率转换器
    _unit_config_: Optional["UnitConfig"] = None

    def __init__(self, **kwargs):
        self.quantity = kwargs.get("quantity", 0)
        self.unit = kwargs.get("unit", None)

    @property
    def kind(self):
        """
        默认情况下返回类名,实际使用时可以重写
        """
        return self.__class__.__name__

    @abstractmethod
    def model_dump(self, *args, **kwargs) -> Dict[str, Any]:
        pass

    @classmethod
    @abstractmethod
    def model_validate(cls, data: Dict[str, Any], **kwargs) -> Self:
        pass

    def __auto_validate__(self, other: "NumberUnit"):
        if self._unit_config_ is None:
            assert (
                self.unit == other.unit
            ), f"unit must be the same, but {self.unit} != {other.unit}"
        else:
            other = unify_quantified_value(
                other, self.unit, unit_config=self._unit_config_
            )
        return other

    def __sub__(self, other: "NumberUnit") -> Self:
        other = self.__auto_validate__(other)
        dc = self.model_dump()
        dc["quantity"] = self.quantity - other.quantity
        return type(self).model_validate(dc)

    def __add__(self, other: "NumberUnit") -> Self:
        other = self.__auto_validate__(other)
        dc = self.model_dump()
        dc["quantity"] += other.quantity
        return type(self)(**dc)

    def __truediv__(self, other: "NumberUnit") -> Self:
        other = self.__auto_validate__(other)
        # TODO: 处理精度问题
        return self.quantity / other.quantity

    def __mul__(self, times: Union[int, float]) -> Self:
        data = self.model_dump()
        data["quantity"] *= times
        return type(self).model_validate(data)

    def __gt__(self, other: "NumberUnit"):
        other = self.__auto_validate__(other)
        return self.quantity > other.quantity

    def __lt__(self, other: "NumberUnit"):
        other = self.__auto_validate__(other)
        return self.quantity < other.quantity

    def __eq__(self, other: "NumberUnit"):
        if not other:
            return False
        other = self.__auto_validate__(other)
        return self.quantity == other.quantity

    def __ge__(self, other: "NumberUnit"):
        other = self.__auto_validate__(other)
        return self.quantity >= other.quantity

    def __le__(self, other: "NumberUnit"):
        other = self.__auto_validate__(other)
        return self.quantity <= other.quantity

    @property
    def value(self):
        return self.quantity

    @value.setter
    def value(self, new_value):
        self.quantity = new_value

    @classmethod
    def parse_tuple(cls, input: Tuple[NumberType, Unit]) -> Self:
        """
        解析形如[1,"个"]的数据
        """
        return cls(quantity=input[0], unit=input[1])

    @classmethod
    def parse_str(cls, input: str, delimiter="|", number_type=float) -> Optional[Self]:
        """
        解析形如1|m/s的数据
        """
        if not input:
            return None
        tmps = input.strip().split(delimiter)
        assert len(tmps) == 2, "unexpected format: " + input
        return cls(quantity=number_type(tmps[0]), unit=tmps[1])

    @classmethod
    def of(cls, num: NumberType, unit: Optional[str] = None) -> Self:
        return cls(quantity=num, unit=unit)

    def increase(self, num: NumberType):
        self.quantity += num

    def decrease(self, num: NumberType):
        self.quantity -= num

kind property

kind

默认情况下返回类名,实际使用时可以重写

quantity class-attribute instance-attribute

quantity: NumberType = get('quantity', 0)

unit class-attribute instance-attribute

unit: Optional[Unit] = get('unit', None)

value property writable

value

__add__

__add__(other: NumberUnit) -> Self
源代码位于: logis/data_type/unitable.py
def __add__(self, other: "NumberUnit") -> Self:
    other = self.__auto_validate__(other)
    dc = self.model_dump()
    dc["quantity"] += other.quantity
    return type(self)(**dc)

__auto_validate__

__auto_validate__(other: NumberUnit)
源代码位于: logis/data_type/unitable.py
def __auto_validate__(self, other: "NumberUnit"):
    if self._unit_config_ is None:
        assert (
            self.unit == other.unit
        ), f"unit must be the same, but {self.unit} != {other.unit}"
    else:
        other = unify_quantified_value(
            other, self.unit, unit_config=self._unit_config_
        )
    return other

__eq__

__eq__(other: NumberUnit)
源代码位于: logis/data_type/unitable.py
def __eq__(self, other: "NumberUnit"):
    if not other:
        return False
    other = self.__auto_validate__(other)
    return self.quantity == other.quantity

__ge__

__ge__(other: NumberUnit)
源代码位于: logis/data_type/unitable.py
def __ge__(self, other: "NumberUnit"):
    other = self.__auto_validate__(other)
    return self.quantity >= other.quantity

__gt__

__gt__(other: NumberUnit)
源代码位于: logis/data_type/unitable.py
def __gt__(self, other: "NumberUnit"):
    other = self.__auto_validate__(other)
    return self.quantity > other.quantity

__init__

__init__(**kwargs)
源代码位于: logis/data_type/unitable.py
def __init__(self, **kwargs):
    self.quantity = kwargs.get("quantity", 0)
    self.unit = kwargs.get("unit", None)

__le__

__le__(other: NumberUnit)
源代码位于: logis/data_type/unitable.py
def __le__(self, other: "NumberUnit"):
    other = self.__auto_validate__(other)
    return self.quantity <= other.quantity

__lt__

__lt__(other: NumberUnit)
源代码位于: logis/data_type/unitable.py
def __lt__(self, other: "NumberUnit"):
    other = self.__auto_validate__(other)
    return self.quantity < other.quantity

__mul__

__mul__(times: Union[int, float]) -> Self
源代码位于: logis/data_type/unitable.py
def __mul__(self, times: Union[int, float]) -> Self:
    data = self.model_dump()
    data["quantity"] *= times
    return type(self).model_validate(data)

__sub__

__sub__(other: NumberUnit) -> Self
源代码位于: logis/data_type/unitable.py
def __sub__(self, other: "NumberUnit") -> Self:
    other = self.__auto_validate__(other)
    dc = self.model_dump()
    dc["quantity"] = self.quantity - other.quantity
    return type(self).model_validate(dc)

__truediv__

__truediv__(other: NumberUnit) -> Self
源代码位于: logis/data_type/unitable.py
def __truediv__(self, other: "NumberUnit") -> Self:
    other = self.__auto_validate__(other)
    # TODO: 处理精度问题
    return self.quantity / other.quantity

decrease

decrease(num: NumberType)
源代码位于: logis/data_type/unitable.py
def decrease(self, num: NumberType):
    self.quantity -= num

increase

increase(num: NumberType)
源代码位于: logis/data_type/unitable.py
def increase(self, num: NumberType):
    self.quantity += num

model_dump abstractmethod

model_dump(*args, **kwargs) -> Dict[str, Any]
源代码位于: logis/data_type/unitable.py
@abstractmethod
def model_dump(self, *args, **kwargs) -> Dict[str, Any]:
    pass

model_validate abstractmethod classmethod

model_validate(data: Dict[str, Any], **kwargs) -> Self
源代码位于: logis/data_type/unitable.py
@classmethod
@abstractmethod
def model_validate(cls, data: Dict[str, Any], **kwargs) -> Self:
    pass

of classmethod

of(num: NumberType, unit: Optional[str] = None) -> Self
源代码位于: logis/data_type/unitable.py
@classmethod
def of(cls, num: NumberType, unit: Optional[str] = None) -> Self:
    return cls(quantity=num, unit=unit)

parse_str classmethod

parse_str(input: str, delimiter='|', number_type=float) -> Optional[Self]

解析形如1|m/s的数据

源代码位于: logis/data_type/unitable.py
@classmethod
def parse_str(cls, input: str, delimiter="|", number_type=float) -> Optional[Self]:
    """
    解析形如1|m/s的数据
    """
    if not input:
        return None
    tmps = input.strip().split(delimiter)
    assert len(tmps) == 2, "unexpected format: " + input
    return cls(quantity=number_type(tmps[0]), unit=tmps[1])

parse_tuple classmethod

parse_tuple(input: Tuple[NumberType, Unit]) -> Self

解析形如[1,"个"]的数据

源代码位于: logis/data_type/unitable.py
@classmethod
def parse_tuple(cls, input: Tuple[NumberType, Unit]) -> Self:
    """
    解析形如[1,"个"]的数据
    """
    return cls(quantity=input[0], unit=input[1])

QuantifiedTask

可量化的任务

源代码位于: logis/task/model/__init__.py
class QuantifiedTask(NumberUnit, ITask):
    """
    可量化的任务
    """

    progress: Union[Number] = 0

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.pipeline_progress: Dict[str, Number] = Counter()

    def update_progress(
        self,
        progress: Union[NumberUnit, Number],
        delta: bool = False,
        pipeline_id: Optional[str] = None,
    ):
        """
        更新进度

        Args:

        """
        if isinstance(progress, NumberUnit):
            assert (
                progress.unit == self.unit
            ), f"进度单位{progress.unit}与任务单位{self.unit}不一致,请在调用处处理"
            progress = progress.quantity

        if pipeline_id is not None:
            if delta:
                self.pipeline_progress[pipeline_id] += progress
            else:
                self.pipeline_progress[pipeline_id] = progress
        else:
            if delta:
                self.progress += progress
            else:
                self.progress = progress

pipeline_progress instance-attribute

pipeline_progress: Dict[str, Number] = Counter()

progress class-attribute instance-attribute

progress: Union[Number] = 0

__init__

__init__(**kwargs)
源代码位于: logis/task/model/__init__.py
def __init__(self, **kwargs):
    super().__init__(**kwargs)
    self.pipeline_progress: Dict[str, Number] = Counter()

update_progress

update_progress(progress: Union[NumberUnit, Number], delta: bool = False, pipeline_id: Optional[str] = None)

更新进度

Args:

源代码位于: logis/task/model/__init__.py
def update_progress(
    self,
    progress: Union[NumberUnit, Number],
    delta: bool = False,
    pipeline_id: Optional[str] = None,
):
    """
    更新进度

    Args:

    """
    if isinstance(progress, NumberUnit):
        assert (
            progress.unit == self.unit
        ), f"进度单位{progress.unit}与任务单位{self.unit}不一致,请在调用处处理"
        progress = progress.quantity

    if pipeline_id is not None:
        if delta:
            self.pipeline_progress[pipeline_id] += progress
        else:
            self.pipeline_progress[pipeline_id] = progress
    else:
        if delta:
            self.progress += progress
        else:
            self.progress = progress

Task

任务基础结构

源代码位于: logis/task/model/__init__.py
class Task(BaseModel, ITask):
    """
    任务基础结构
    """

    id: Optional[TaskId] = None
    parent_id: Optional[TaskId] = None
    children_ids: Iterable[TaskId] = Field(default_factory=list)

    name: Optional[str] = None
    type: Optional[str] = None
    priority: Optional[TaskPriority] = None

    repeat: int = 1
    progress: Union[float, int] = 0
    tags: Iterable[str] = Field(default_factory=set)

    created_at: Optional[int] = None
    updated_at: Optional[int] = None
    started_at: Optional[int] = None
    finished_at: Optional[int] = None
    cancelled_at: Optional[int] = None

    status: Optional[TaskStatus] = None
    stage: Optional[str] = None

    remark: Optional[str] = None

    def get_task_id(self) -> TaskId:
        return self.id

    def get_priority(self) -> TaskPriority:
        return self.priority

    def update_status(self, status: TaskStatus):
        self.status = status

    def is_status_at(self, status: TaskStatus) -> bool:
        return self.status == status

cancelled_at class-attribute instance-attribute

cancelled_at: Optional[int] = None

children_ids class-attribute instance-attribute

children_ids: Iterable[TaskId] = Field(default_factory=list)

created_at class-attribute instance-attribute

created_at: Optional[int] = None

finished_at class-attribute instance-attribute

finished_at: Optional[int] = None

id class-attribute instance-attribute

id: Optional[TaskId] = None

name class-attribute instance-attribute

name: Optional[str] = None

parent_id class-attribute instance-attribute

parent_id: Optional[TaskId] = None

priority class-attribute instance-attribute

priority: Optional[TaskPriority] = None

progress class-attribute instance-attribute

progress: Union[float, int] = 0

remark class-attribute instance-attribute

remark: Optional[str] = None

repeat class-attribute instance-attribute

repeat: int = 1

stage class-attribute instance-attribute

stage: Optional[str] = None

started_at class-attribute instance-attribute

started_at: Optional[int] = None

status class-attribute instance-attribute

status: Optional[TaskStatus] = None

tags class-attribute instance-attribute

tags: Iterable[str] = Field(default_factory=set)

type class-attribute instance-attribute

type: Optional[str] = None

updated_at class-attribute instance-attribute

updated_at: Optional[int] = None

get_priority

get_priority() -> TaskPriority
源代码位于: logis/task/model/__init__.py
def get_priority(self) -> TaskPriority:
    return self.priority

get_task_id

get_task_id() -> TaskId
源代码位于: logis/task/model/__init__.py
def get_task_id(self) -> TaskId:
    return self.id

is_status_at

is_status_at(status: TaskStatus) -> bool
源代码位于: logis/task/model/__init__.py
def is_status_at(self, status: TaskStatus) -> bool:
    return self.status == status

update_status

update_status(status: TaskStatus)
源代码位于: logis/task/model/__init__.py
def update_status(self, status: TaskStatus):
    self.status = status

TaskGraph

以任务id作为节点id组成的任务树,内部数据结构是一个有向无环图

源代码位于: logis/task/manager.py
class TaskGraph(AbstractTaskManager):
    """
    以任务id作为节点id组成的任务树,内部数据结构是一个有向无环图
    """

    __KEY_TASK__ = "task"

    def __init__(self, **attr):
        self.__graph__ = DiGraph()
        self.__id_task_map__ = defaultdict()

    def parse_task_id(self, task: TaskLike) -> TaskId:
        if isinstance(task, (str, int)):
            return task
        if isinstance(task, ITask):
            return task.get_task_id()
        raise ValueError(f"Unknown task type {type(task)}")

    def task_size(self) -> int:
        """
        获取任务数量
        """
        return self.__graph__.number_of_nodes()

    def tasks(self, only_id: bool = False) -> Iterator[ITask]:
        for node_id in self.__graph__.nodes:
            yield node_id if only_id else self.get_task(node_id)

    def get_task(self, task_id: TaskId) -> Optional[ITask]:
        task = self.__id_task_map__.get(task_id)
        if task:
            return task
        node = self.__graph__.nodes.get(task_id)
        return node.get(self.__KEY_TASK__) if node else None

    def find_by(self):
        raise NotImplementedError("TaskGraph.find_by not implemented")

    def add_task(self, task: TaskLike, parent: Optional[TaskLike] = None, **attr):
        task_id = self.parse_task_id(task)
        self.__id_task_map__[task_id] = task
        self.__graph__.add_node(task_id, **attr)
        if parent:
            parent_id = self.parse_task_id(parent)
            self.add_task_if_absent(parent)
            assert (
                parent_id != task_id
            ), f"parent task {parent_id} and task itself {task_id} should be different"
            self.__graph__.add_edge(parent_id, task_id)

    def add_task_if_absent(
        self, task: TaskLike, parent: Optional[TaskLike] = None, **attr
    ) -> bool:
        task_id = self.parse_task_id(task)
        has = self.__graph__.has_node(task_id)
        if not has:
            self.add_task(task, parent=parent, **attr)
        return has

    def remove_task(self, task: TaskLike, cascade: bool = False, **kwargs):
        task_id = self.parse_task_id(task)
        if cascade:
            for child_id in list(self.get_children_id(task, strict=True)):
                self.remove_task(child_id, cascade=cascade, **kwargs)
        self.__graph__.remove_node(task_id)

    def remove_task_if_present(
        self, task: TaskLike, cascade: bool = False, **kwargs
    ) -> bool:
        task_id = self.parse_task_id(task)
        has = self.__graph__.has_node(task_id)
        if has:
            self.remove_task(task, cascade=cascade, **kwargs)
        return has

    def get_children_id(self, task: TaskLike, strict: bool = True) -> Iterator[TaskId]:
        if strict:
            return self.__graph__.successors(self.parse_task_id(task))
        raise NotImplementedError()

    def get_parents_id(self, task: TaskLike, strict: bool = True) -> Iterable[TaskId]:
        if strict:
            return self.__graph__.predecessors(self.parse_task_id(task))
        raise NotImplementedError()

    def has_parent_child_relationship(
        self, a: TaskLike, b: TaskLike, strict: bool = True
    ) -> bool:
        a_id = self.parse_task_id(a)
        b_id = self.parse_task_id(b)
        return (
            self.__graph__.has_edge(a_id, b_id)
            if strict
            else has_path(self.__graph__, a_id, b_id)
        )

    def __contains__(self, task: TaskLike) -> bool:
        """
        检查任务是否存在
        """
        return self.__graph__.has_node(self.parse_task_id(task))

    def update_task_status(self, task: TaskId, status: TaskStatus):
        """
        将任务状态设置为指定值
        """
        task_id = self.parse_task_id(task)
        task: ITask = self.get_task(task_id)
        assert task, f"任务 {task_id} 不存在"
        task.update_status(status)

    def is_status_at(self, task, status, infer=True, update=True):
        task_id = self.parse_task_id(task)
        task: ITask = self.get_task(task_id)

        match = task.is_status_at(status)
        if match is True:
            return match
        if not infer:
            return match

        children_ids = list(self.get_children_id(task_id, strict=True))
        all_children_match = (
            False
            if not children_ids
            else all(
                self.is_status_at(child_id, status, infer=infer, update=update)
                for child_id in children_ids
            )
        )
        if all_children_match and update:
            self.update_task_status(task_id, status)
        return all_children_match

    def is_task_finished(self, task: TaskLike, update: bool = True, infer=True):
        return self.is_status_at(task, TaskStatus.FINISHED, infer=infer, update=update)

__KEY_TASK__ class-attribute instance-attribute

__KEY_TASK__ = 'task'

__graph__ instance-attribute

__graph__ = DiGraph()

__id_task_map__ instance-attribute

__id_task_map__ = defaultdict()

__contains__

__contains__(task: TaskLike) -> bool

检查任务是否存在

源代码位于: logis/task/manager.py
def __contains__(self, task: TaskLike) -> bool:
    """
    检查任务是否存在
    """
    return self.__graph__.has_node(self.parse_task_id(task))

__init__

__init__(**attr)
源代码位于: logis/task/manager.py
def __init__(self, **attr):
    self.__graph__ = DiGraph()
    self.__id_task_map__ = defaultdict()

add_task

add_task(task: TaskLike, parent: Optional[TaskLike] = None, **attr)
源代码位于: logis/task/manager.py
def add_task(self, task: TaskLike, parent: Optional[TaskLike] = None, **attr):
    task_id = self.parse_task_id(task)
    self.__id_task_map__[task_id] = task
    self.__graph__.add_node(task_id, **attr)
    if parent:
        parent_id = self.parse_task_id(parent)
        self.add_task_if_absent(parent)
        assert (
            parent_id != task_id
        ), f"parent task {parent_id} and task itself {task_id} should be different"
        self.__graph__.add_edge(parent_id, task_id)

add_task_if_absent

add_task_if_absent(task: TaskLike, parent: Optional[TaskLike] = None, **attr) -> bool
源代码位于: logis/task/manager.py
def add_task_if_absent(
    self, task: TaskLike, parent: Optional[TaskLike] = None, **attr
) -> bool:
    task_id = self.parse_task_id(task)
    has = self.__graph__.has_node(task_id)
    if not has:
        self.add_task(task, parent=parent, **attr)
    return has

find_by

find_by()
源代码位于: logis/task/manager.py
def find_by(self):
    raise NotImplementedError("TaskGraph.find_by not implemented")

get_children_id

get_children_id(task: TaskLike, strict: bool = True) -> Iterator[TaskId]
源代码位于: logis/task/manager.py
def get_children_id(self, task: TaskLike, strict: bool = True) -> Iterator[TaskId]:
    if strict:
        return self.__graph__.successors(self.parse_task_id(task))
    raise NotImplementedError()

get_parents_id

get_parents_id(task: TaskLike, strict: bool = True) -> Iterable[TaskId]
源代码位于: logis/task/manager.py
def get_parents_id(self, task: TaskLike, strict: bool = True) -> Iterable[TaskId]:
    if strict:
        return self.__graph__.predecessors(self.parse_task_id(task))
    raise NotImplementedError()

get_task

get_task(task_id: TaskId) -> Optional[ITask]
源代码位于: logis/task/manager.py
def get_task(self, task_id: TaskId) -> Optional[ITask]:
    task = self.__id_task_map__.get(task_id)
    if task:
        return task
    node = self.__graph__.nodes.get(task_id)
    return node.get(self.__KEY_TASK__) if node else None

has_parent_child_relationship

has_parent_child_relationship(a: TaskLike, b: TaskLike, strict: bool = True) -> bool
源代码位于: logis/task/manager.py
def has_parent_child_relationship(
    self, a: TaskLike, b: TaskLike, strict: bool = True
) -> bool:
    a_id = self.parse_task_id(a)
    b_id = self.parse_task_id(b)
    return (
        self.__graph__.has_edge(a_id, b_id)
        if strict
        else has_path(self.__graph__, a_id, b_id)
    )

is_status_at

is_status_at(task, status, infer=True, update=True)
源代码位于: logis/task/manager.py
def is_status_at(self, task, status, infer=True, update=True):
    task_id = self.parse_task_id(task)
    task: ITask = self.get_task(task_id)

    match = task.is_status_at(status)
    if match is True:
        return match
    if not infer:
        return match

    children_ids = list(self.get_children_id(task_id, strict=True))
    all_children_match = (
        False
        if not children_ids
        else all(
            self.is_status_at(child_id, status, infer=infer, update=update)
            for child_id in children_ids
        )
    )
    if all_children_match and update:
        self.update_task_status(task_id, status)
    return all_children_match

is_task_finished

is_task_finished(task: TaskLike, update: bool = True, infer=True)
源代码位于: logis/task/manager.py
def is_task_finished(self, task: TaskLike, update: bool = True, infer=True):
    return self.is_status_at(task, TaskStatus.FINISHED, infer=infer, update=update)

parse_task_id

parse_task_id(task: TaskLike) -> TaskId
源代码位于: logis/task/manager.py
def parse_task_id(self, task: TaskLike) -> TaskId:
    if isinstance(task, (str, int)):
        return task
    if isinstance(task, ITask):
        return task.get_task_id()
    raise ValueError(f"Unknown task type {type(task)}")

remove_task

remove_task(task: TaskLike, cascade: bool = False, **kwargs)
源代码位于: logis/task/manager.py
def remove_task(self, task: TaskLike, cascade: bool = False, **kwargs):
    task_id = self.parse_task_id(task)
    if cascade:
        for child_id in list(self.get_children_id(task, strict=True)):
            self.remove_task(child_id, cascade=cascade, **kwargs)
    self.__graph__.remove_node(task_id)

remove_task_if_present

remove_task_if_present(task: TaskLike, cascade: bool = False, **kwargs) -> bool
源代码位于: logis/task/manager.py
def remove_task_if_present(
    self, task: TaskLike, cascade: bool = False, **kwargs
) -> bool:
    task_id = self.parse_task_id(task)
    has = self.__graph__.has_node(task_id)
    if has:
        self.remove_task(task, cascade=cascade, **kwargs)
    return has

task_size

task_size() -> int

获取任务数量

源代码位于: logis/task/manager.py
def task_size(self) -> int:
    """
    获取任务数量
    """
    return self.__graph__.number_of_nodes()

tasks

tasks(only_id: bool = False) -> Iterator[ITask]
源代码位于: logis/task/manager.py
def tasks(self, only_id: bool = False) -> Iterator[ITask]:
    for node_id in self.__graph__.nodes:
        yield node_id if only_id else self.get_task(node_id)

update_task_status

update_task_status(task: TaskId, status: TaskStatus)

将任务状态设置为指定值

源代码位于: logis/task/manager.py
def update_task_status(self, task: TaskId, status: TaskStatus):
    """
    将任务状态设置为指定值
    """
    task_id = self.parse_task_id(task)
    task: ITask = self.get_task(task_id)
    assert task, f"任务 {task_id} 不存在"
    task.update_status(status)

TaskStatus

任务状态

源代码位于: logis/task/model/__init__.py
class TaskStatus(Enum):
    """
    任务状态
    """

    NOT_STARTED = "not_started"
    STARTED = "started"
    PAUSED = "paused"
    FINISHED = "finished"
    CANCELLED = "cancelled"
    FAILED = "failed"

CANCELLED class-attribute instance-attribute

CANCELLED = 'cancelled'

FAILED class-attribute instance-attribute

FAILED = 'failed'

FINISHED class-attribute instance-attribute

FINISHED = 'finished'

NOT_STARTED class-attribute instance-attribute

NOT_STARTED = 'not_started'

PAUSED class-attribute instance-attribute

PAUSED = 'paused'

STARTED class-attribute instance-attribute

STARTED = 'started'

WorkingMode

工作模式

源代码位于: logis/task/iface/__init__.py
class WorkingMode(IWorkingMode, Enum, metaclass=ABCEnumMeta):
    """
    工作模式
    """

    SYNC = "synchronous"
    ASYNC = "asynchronous"

ASYNC class-attribute instance-attribute

ASYNC = 'asynchronous'

SYNC class-attribute instance-attribute

SYNC = 'synchronous'