Stage¶
Individual processing stages that execute CPU-bound tasks in separate threads.
PipelineStage
¶
PipelineStage(name, process_fn, input_queue, output_queue, enable_timing=True, return_exceptions=False)
Single stage in the CPU-intensive task pipeline.
Represents an individual processing stage that runs in its own thread and processes items from an input queue, applying a transformation function, and placing results in an output queue.
PARAMETER | DESCRIPTION |
---|---|
name
|
Unique identifier for this stage, used in logging and timing analysis.
TYPE:
|
process_fn
|
Function to process data items. Should be thread-safe and accept a single argument, returning processed data, None (to filter), or a generator (for multiple outputs).
TYPE:
|
input_queue
|
Queue from which to read input items for processing.
TYPE:
|
output_queue
|
Queue to which processed items are written.
TYPE:
|
enable_timing
|
Whether to collect detailed timing information for this stage.
TYPE:
|
Source code in src/async_task_pipeline/base/stage.py
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
|
start
¶
start()
Start the worker thread for this stage.
Creates and starts a daemon thread that will continuously process items from the input queue until stopped.
Notes
The worker thread is marked as daemon so it won't prevent the program from exiting.
Source code in src/async_task_pipeline/base/stage.py
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
|
stop
¶
stop()
Stop the worker thread.
Signals the worker thread to stop and waits for it to complete. Sends a sentinel value (None) to the input queue to wake up the worker if it's waiting.
Notes
This method blocks until the worker thread has fully stopped.
Source code in src/async_task_pipeline/base/stage.py
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
|
get_average_processing_time
¶
get_average_processing_time()
Get average processing time for this stage.
Calculates the average time spent in the processing function across all processed items.
RETURNS | DESCRIPTION |
---|---|
float
|
Average processing time in seconds, or 0.0 if timing is disabled or no items have been processed. |
Source code in src/async_task_pipeline/base/stage.py
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
|
clear_input_queue
¶
clear_input_queue()
Clear the input queue
Source code in src/async_task_pipeline/base/stage.py
187 188 189 190 |
|