Pipeline¶
The main pipeline orchestrator that manages data flow through multiple processing stages.
AsyncTaskPipeline
¶
AsyncTaskPipeline(max_queue_size=100, enable_timing=False, return_exceptions=False)
Bases: Generic[T, U]
Main pipeline orchestrator with async I/O and thread-based processing.
This class manages a multi-stage data processing pipeline that combines async I/O for input/output operations with thread-based processing for CPU-intensive tasks. It provides comprehensive timing analysis and performance monitoring capabilities. It takes two type parameters:
- T: The type of the data you want to process (e.g. messages, signals or events)
- U: The type of the pass-through data (e.g. exceptions)
PARAMETER | DESCRIPTION |
---|---|
max_queue_size
|
Maximum size for inter-stage queues. Controls memory usage and back pressure.
TYPE:
|
enable_timing
|
Whether to enable detailed timing analysis for performance monitoring.
TYPE:
|
return_exceptions
|
Whether to return exceptions in the output stream.
TYPE:
|
Examples:
>>> pipeline = AsyncTaskPipeline(max_queue_size=50, enable_timing=True)
>>> pipeline.add_stage("process", lambda x: x * 2)
>>> await pipeline.start()
Source code in src/async_task_pipeline/base/pipeline.py
54 55 56 57 58 59 60 61 62 63 64 65 66 |
|
add_stage
¶
add_stage(name, process_fn)
Add a processing stage to the pipeline.
Creates a new stage with the specified processing function and connects it to the pipeline's queue system. Stages are executed in the order they are added.
PARAMETER | DESCRIPTION |
---|---|
name
|
Unique identifier for this stage, used in timing analysis and logging.
TYPE:
|
process_fn
|
Function to process data items. Should accept a single argument and return processed data, None (to filter), or a generator (to produce multiple outputs).
TYPE:
|
Notes
The process_fn should be thread-safe as it will be executed in a separate thread. If the function returns None, the item is filtered out. If it returns a generator, each yielded value becomes a separate pipeline item.
Source code in src/async_task_pipeline/base/pipeline.py
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
|
start
async
¶
start()
Start all pipeline stages.
Initializes and starts worker threads for all registered stages. The pipeline must be started before processing any data.
RAISES | DESCRIPTION |
---|---|
RuntimeError
|
If the pipeline is already running or if no stages have been added. |
Source code in src/async_task_pipeline/base/pipeline.py
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
|
stop
async
¶
stop()
Stop all pipeline stages.
Gracefully shuts down all worker threads and clears pipeline state. This method should be called when pipeline processing is complete.
Notes
Stages are stopped in reverse order to ensure proper cleanup. Any remaining items in queues will be lost.
Source code in src/async_task_pipeline/base/pipeline.py
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
|
process_input_stream
async
¶
process_input_stream(input_stream)
Consume async input stream and feed to pipeline.
Processes an async iterator/generator and feeds each item into the pipeline for processing. This method handles the async-to-sync bridge for pipeline input.
PARAMETER | DESCRIPTION |
---|---|
input_stream
|
Async iterator that yields data items to be processed.
TYPE:
|
Notes
This method will consume the entire input stream. For continuous
processing, use individual process_input_data
calls.
Source code in src/async_task_pipeline/base/pipeline.py
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
|
interrupt
async
¶
interrupt()
Interrupt the pipeline
Source code in src/async_task_pipeline/base/pipeline.py
166 167 168 169 170 |
|
process_input_data
async
¶
process_input_data(data, created_at)
Source code in src/async_task_pipeline/base/pipeline.py
172 173 |
|
process_input_sentinel
async
¶
process_input_sentinel(sentinel)
Source code in src/async_task_pipeline/base/pipeline.py
175 176 |
|
put_input_sentinel
¶
put_input_sentinel(sentinel)
Put a sentinel value into the input queue.
This method is used to signal the end of the input stream.
It is typically used in conjunction with process_input_stream
.
PARAMETER | DESCRIPTION |
---|---|
sentinel
|
The sentinel value to put into the input queue.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
RuntimeError
|
If the pipeline is not running. |
Source code in src/async_task_pipeline/base/pipeline.py
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
|
put_input_data
¶
put_input_data(data, created_at)
Put data into the input queue.
This method is used to feed data into the pipeline.
It is typically used in conjunction with process_input_data
.
PARAMETER | DESCRIPTION |
---|---|
data
|
The data to put into the input queue.
TYPE:
|
created_at
|
The timestamp when the data was created.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
RuntimeError
|
If the pipeline is not running. |
Source code in src/async_task_pipeline/base/pipeline.py
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 |
|
generate_output_stream
async
¶
generate_output_stream()
Generate async output stream from pipeline, maintaining order.
Creates an async iterator that yields processed items as they become available from the pipeline. Items are yielded in the order they were processed (which may differ from input order due to parallel processing).
YIELDS | DESCRIPTION |
---|---|
T | U
|
Processed data items or sentinel values from the pipeline. |
Notes
This method will continue yielding items until the pipeline is stopped and all queues are empty. It's typically used in an async for loop.
Source code in src/async_task_pipeline/base/pipeline.py
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 |
|
get_report
¶
get_report(include_item_details=True)
Get summary statistics for pipeline latency.
Computes comprehensive performance statistics including end-to-end latency, per-stage timing breakdowns, and efficiency metrics.
RETURNS | DESCRIPTION |
---|---|
PipelineAnalysis | None
|
Pipeline analysis including summary statistics and detailed item breakdowns. Returns None if no items have been processed or timing is disabled. |
Notes
Only available when timing is enabled. Returns None if no items have been processed or timing is disabled.
Source code in src/async_task_pipeline/base/pipeline.py
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 |
|
clear
¶
clear()
Clear the pipeline state and queues.
Removes all items from input/output queues and stage queues, resets completed items list, and resets the sequence counter. This method is useful for resetting the pipeline between runs.
Notes
This method should only be called when the pipeline is stopped. Any items currently being processed may be lost.
Source code in src/async_task_pipeline/base/pipeline.py
412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 |
|
clear_input_queue
¶
clear_input_queue()
Clear the input queue
Source code in src/async_task_pipeline/base/pipeline.py
433 434 435 436 437 |
|
clear_output_queue
¶
clear_output_queue()
Clear the output queue
Source code in src/async_task_pipeline/base/pipeline.py
439 440 441 442 443 |
|