Skip to content

Utilities

Utility modules for performance analysis, logging, and metrics collection.

Performance Analysis

log_pipeline_performance_analysis

log_pipeline_performance_analysis(pipeline)

Log comprehensive performance analysis for an AsyncTaskPipeline.

Analyzes pipeline performance and logs detailed metrics including overall efficiency, per-stage breakdowns, and individual item timing analysis. This function is useful for identifying bottlenecks and optimizing pipeline performance.

PARAMETER DESCRIPTION
pipeline

The pipeline instance to analyze. Must have timing enabled and have processed at least one item.

TYPE: AsyncTaskPipeline

Notes

This function logs analysis results using the pipeline's logger. If timing is disabled on the pipeline, only a warning message is logged.

Examples:

>>> pipeline = AsyncTaskPipeline(enable_timing=True)
>>> # ... process some data ...
>>> log_pipeline_performance_analysis(pipeline)
Source code in src/async_task_pipeline/utils/analysis.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def log_pipeline_performance_analysis(pipeline: "AsyncTaskPipeline") -> None:  # pragma: no cover
    """Log comprehensive performance analysis for an AsyncTaskPipeline.

    Analyzes pipeline performance and logs detailed metrics including overall
    efficiency, per-stage breakdowns, and individual item timing analysis.
    This function is useful for identifying bottlenecks and optimizing
    pipeline performance.

    Parameters
    ----------
    pipeline : AsyncTaskPipeline
        The pipeline instance to analyze. Must have timing enabled and
        have processed at least one item.

    Notes
    -----
    This function logs analysis results using the pipeline's logger.
    If timing is disabled on the pipeline, only a warning message is logged.

    Examples
    --------
    >>> pipeline = AsyncTaskPipeline(enable_timing=True)
    >>> # ... process some data ...
    >>> log_pipeline_performance_analysis(pipeline)
    """
    if not pipeline.enable_timing:
        logger.info("Pipeline timing is disabled. No analysis available.")
        return

    summary = pipeline.get_report()
    if summary is None:
        logger.info("No items processed. No analysis available.")
        return

    logger.info("Enhanced Pipeline Performance Analysis:")
    logger.info(f" Total items processed: {summary.summary.total_items}")
    logger.info(f" Average end-to-end latency: {summary.summary.avg_total_latency:.3f}s")

    efficiency = summary.summary.overall_efficiency
    logger.info(" Efficiency Details:")
    logger.info(f"  Computation efficiency: {efficiency.computation_efficiency:.1%}")
    logger.info(f"  Overhead ratio: {efficiency.overhead_ratio:.1%}")

    logger.info(" Per-Stage Performance Breakdown:")
    for stage_name, stats in summary.summary.stage_statistics.items():
        timing = stats.timing_breakdown
        logger.info(f" - {stage_name}:")
        logger.info(f"   Processed: {stats.processed_count} items")
        logger.info(f"   Avg computation time: {timing.avg_computation_time * 1000:.2f}ms")
        logger.info(f"   Avg queue wait time: {timing.avg_queue_wait_time * 1000:.2f}ms")
        logger.info(f"   Avg transmission time: {timing.avg_transmission_time * 1000:.2f}ms")
        logger.info(f"   Computation ratio: {timing.computation_ratio:.1%}")

    if summary.item_breakdowns is None:
        logger.info("No item breakdowns available.")
        return

    logger.info(" Detailed Analysis for First Few Items:")
    for item in summary.item_breakdowns[:3]:
        logger.info(" - Item:")
        totals = item.totals
        logger.info(f"   Total latency: {totals.total_latency * 1000:.2f}ms")
        logger.info(f"   Actual computation time: {totals.total_computation_time * 1000:.2f}ms")
        logger.info(f"   Actual overhead time: {totals.total_overhead_time * 1000:.2f}ms")
        logger.info(f"   Computation ratio: {totals.computation_ratio:.1%}")

Metrics

DetailedTiming

Bases: BaseModel

Detailed timing information for a pipeline stage.

Captures precise timing measurements for different phases of item processing within a pipeline stage, enabling detailed performance analysis and bottleneck identification.

PARAMETER DESCRIPTION
queue_enter_time

Timestamp when the item entered the stage's input queue.

TYPE: float

processing_start_time

Timestamp when the stage began processing the item.

TYPE: float

processing_end_time

Timestamp when the stage finished processing the item.

TYPE: float

queue_exit_time

Timestamp when the processed item was placed in the output queue.

TYPE: float

queue_enter_time instance-attribute

queue_enter_time

processing_start_time instance-attribute

processing_start_time

processing_end_time instance-attribute

processing_end_time

queue_exit_time instance-attribute

queue_exit_time

queue_wait_time property

queue_wait_time

Time spent waiting in input queue.

RETURNS DESCRIPTION
float

Duration in seconds between queue entry and processing start.

computation_time property

computation_time

Time spent in actual computation.

RETURNS DESCRIPTION
float

Duration in seconds of the actual processing function execution.

transmission_time property

transmission_time

Time spent in transmission to next stage.

RETURNS DESCRIPTION
float

Duration in seconds between processing completion and output queue placement.

Logging

logging

Logging utilities for the async task pipeline framework.

This module provides a centralized logger instance used throughout the pipeline framework for consistent logging behavior.

FORMAT module-attribute

FORMAT = '%(message)s'

logger module-attribute

logger = getLogger('async_task_pipeline')

Logger instance for the async task pipeline framework.

This logger is used throughout the framework for consistent logging. Configure it at the application level to control log output format, level, and destinations.

Examples:

>>> import logging
>>> from async_task_pipeline.utils import logger
>>>
>>> # Configure logging level
>>> logger.setLevel(logging.INFO)
>>>
>>> # Add a handler
>>> handler = logging.StreamHandler()
>>> logger.addHandler(handler)