class IMetricCollector(ABC):
"""
指标收集器
"""
def __init__(self, write_buffer: Optional[WriteBuffer] = None, **kwargs):
super().__init__(**kwargs)
self._buffer = write_buffer or AsyncWriteBuffer(
WriteBufferConfig(batch_size=100, flush_interval=10, handler=self.submit)
)
invoke(self._buffer.start)
@abstractmethod
def submit(self, metrics: List[T]):
"""
提交指标
"""
pass
def collect(self, metric: T):
"""
缓冲指标
"""
invoke(self._buffer.put, metric)
def flush(self, flush_all: bool = False, **kwargs):
"""
刷新指标
"""
invoke(self._buffer.flush, flush_all=flush_all, **kwargs)