Skip to content

Item

Data containers that flow through the pipeline with timing and sequence tracking.

PipelineItem

Bases: BaseModel, Generic[DataT]

Data container for pipeline processing.

A wrapper class that carries data through the pipeline along with metadata for tracking timing and performance analysis. Each item maintains detailed timing information as it flows through different pipeline stages.

PARAMETER DESCRIPTION
data

The actual data payload being processed through the pipeline.

TYPE: DataT

enable_timing

Whether to collect detailed timing information for performance analysis.

TYPE: bool DEFAULT: True

start_timestamp

Timestamp when the item entered the pipeline. Auto-generated if not provided.

TYPE: float

data instance-attribute

data

enable_timing class-attribute instance-attribute

enable_timing = True

start_timestamp class-attribute instance-attribute

start_timestamp = Field(default_factory=perf_counter)

record_entry_time

record_entry_time(stage_name)

Record when item enters a stage's input queue.

PARAMETER DESCRIPTION
stage_name

Name of the stage whose queue the item is entering.

TYPE: str

Source code in src/async_task_pipeline/base/item.py
57
58
59
60
61
62
63
64
65
66
@_if_timing_enabled
def record_entry_time(self, stage_name: str) -> None:
    """Record when item enters a stage's input queue.

    Parameters
    ----------
    stage_name : str
        Name of the stage whose queue the item is entering.
    """
    self._entry_timestamps[stage_name] = time.perf_counter()

record_completion_time

record_completion_time(stage_name)

Record when a stage completes processing this item.

PARAMETER DESCRIPTION
stage_name

Name of the stage that completed processing.

TYPE: str

Source code in src/async_task_pipeline/base/item.py
68
69
70
71
72
73
74
75
76
77
@_if_timing_enabled
def record_completion_time(self, stage_name: str) -> None:
    """Record when a stage completes processing this item.

    Parameters
    ----------
    stage_name : str
        Name of the stage that completed processing.
    """
    self._completion_timestamps[stage_name] = time.perf_counter()

record_detailed_timing

record_detailed_timing(stage_name, detailed_timing)

Record detailed timing for a stage

Source code in src/async_task_pipeline/base/item.py
79
80
81
82
@_if_timing_enabled
def record_detailed_timing(self, stage_name: str, detailed_timing: DetailedTiming) -> None:
    """Record detailed timing for a stage"""
    self._detailed_timings[stage_name] = detailed_timing

get_entry_time

get_entry_time(stage_name)

Get the time the item entered the queue for a stage

Source code in src/async_task_pipeline/base/item.py
84
85
86
87
88
89
@_if_timing_enabled
def get_entry_time(self, stage_name: str) -> float | None:
    """Get the time the item entered the queue for a stage"""
    if stage_name not in self._entry_timestamps:
        return None
    return self._entry_timestamps[stage_name]

get_completion_time

get_completion_time(stage_name)

Get the time the item completed processing for a stage

Source code in src/async_task_pipeline/base/item.py
91
92
93
94
95
96
@_if_timing_enabled
def get_completion_time(self, stage_name: str) -> float | None:
    """Get the time the item completed processing for a stage"""
    if stage_name not in self._completion_timestamps:
        return None
    return self._completion_timestamps[stage_name]

get_detailed_timing

get_detailed_timing(stage_name)

Get the detailed timing for a stage

Source code in src/async_task_pipeline/base/item.py
 98
 99
100
101
102
103
@_if_timing_enabled
def get_detailed_timing(self, stage_name: str) -> DetailedTiming | None:
    """Get the detailed timing for a stage"""
    if stage_name not in self._detailed_timings:
        return None
    return self._detailed_timings[stage_name]

get_total_latency

get_total_latency()

Calculate total end-to-end latency.

Computes the time from when the item entered the pipeline until the last stage completed processing it.

RETURNS DESCRIPTION
float | None

Total latency in seconds, or None if timing is disabled or no stages have completed processing.

Source code in src/async_task_pipeline/base/item.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
@_if_timing_enabled
def get_total_latency(self) -> float | None:
    """Calculate total end-to-end latency.

    Computes the time from when the item entered the pipeline until
    the last stage completed processing it.

    Returns
    -------
    float | None
        Total latency in seconds, or None if timing is disabled or
        no stages have completed processing.
    """
    if not self._completion_timestamps or self.start_timestamp is None:
        return None

    last_timestamp = max(self._completion_timestamps.values())
    return last_timestamp - self.start_timestamp

get_stage_latencies

get_stage_latencies()

Calculate latency for each stage

Source code in src/async_task_pipeline/base/item.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
@_if_timing_enabled
def get_stage_latencies(self) -> dict[str, float] | None:
    """Calculate latency for each stage"""
    if not self._completion_timestamps or self.start_timestamp is None:
        return None
    latencies: dict[str, float] = {}
    sorted_stages = sorted(self._completion_timestamps.items(), key=lambda x: x[1])

    prev_time = self.start_timestamp
    for stage_name, timestamp in sorted_stages:
        latencies[stage_name] = timestamp - prev_time
        prev_time = timestamp

    return latencies

get_timing_breakdown

get_timing_breakdown()

Get detailed timing breakdown for each stage.

Provides comprehensive timing analysis including queue wait times, computation times, transmission times, and overall efficiency metrics.

RETURNS DESCRIPTION
dict[str, dict[str, float]] | None

Dictionary with per-stage timing breakdowns and totals, including: - Per-stage: queue_wait_time, computation_time, transmission_time - Totals: total_computation_time, total_overhead_time, computation_ratio Returns None if timing is disabled or no detailed timings available.

Source code in src/async_task_pipeline/base/item.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
@_if_timing_enabled
def get_timing_breakdown(self) -> dict[str, dict[str, float]] | None:
    """Get detailed timing breakdown for each stage.

    Provides comprehensive timing analysis including queue wait times,
    computation times, transmission times, and overall efficiency metrics.

    Returns
    -------
    dict[str, dict[str, float]] | None
        Dictionary with per-stage timing breakdowns and totals, including:
        - Per-stage: queue_wait_time, computation_time, transmission_time
        - Totals: total_computation_time, total_overhead_time, computation_ratio
        Returns None if timing is disabled or no detailed timings available.
    """
    if not self._detailed_timings or self.start_timestamp is None:
        return None

    breakdown: dict[str, dict[str, float]] = {
        stage_name: {
            "queue_wait_time": timing.queue_wait_time,
            "computation_time": timing.computation_time,
            "transmission_time": timing.transmission_time,
            "total_stage_time": timing.queue_wait_time + timing.computation_time + timing.transmission_time,
        }
        for stage_name, timing in self._detailed_timings.items()
    }
    total_latency = self.get_total_latency()
    events: list[tuple[float, str, str | None]] = [(self.start_timestamp, "start", None)]

    for stage_name, timing in self._detailed_timings.items():
        events.extend(
            (
                (timing.processing_start_time, "compute_start", stage_name),
                (timing.processing_end_time, "compute_end", stage_name),
            )
        )
    events.sort(key=lambda x: x[0])

    total_computation_time = 0.0
    last_time = self.start_timestamp
    computing_stages: set[str | None] = set()

    for event_time, event_type, _stage_name in events:
        if computing_stages:
            total_computation_time += event_time - last_time

        if event_type == "compute_start":
            computing_stages.add(_stage_name)
        elif event_type == "compute_end":
            computing_stages.discard(_stage_name)

        last_time = event_time

    end_time = self.start_timestamp + (total_latency or 0.0)
    if computing_stages and last_time < end_time:
        total_computation_time += end_time - last_time

    total_overhead_time = total_latency - total_computation_time if total_latency else 0.0

    breakdown["totals"] = {
        "total_computation_time": total_computation_time,
        "total_overhead_time": total_overhead_time,
        "total_latency": total_latency if total_latency is not None else 0.0,
        "computation_ratio": (total_computation_time / total_latency) if total_latency else 0.0,
    }

    return breakdown