Skip to content

Stage

Individual processing stages that execute CPU-bound tasks in separate threads.

PipelineStage

PipelineStage(name, process_fn, input_queue, output_queue, enable_timing=True, return_exceptions=False)

Single stage in the CPU-intensive task pipeline.

Represents an individual processing stage that runs in its own thread and processes items from an input queue, applying a transformation function, and placing results in an output queue.

PARAMETER DESCRIPTION
name

Unique identifier for this stage, used in logging and timing analysis.

TYPE: str

process_fn

Function to process data items. Should be thread-safe and accept a single argument, returning processed data, None (to filter), or a generator (for multiple outputs).

TYPE: Callable

input_queue

Queue from which to read input items for processing.

TYPE: Queue

output_queue

Queue to which processed items are written.

TYPE: Queue

enable_timing

Whether to collect detailed timing information for this stage.

TYPE: bool DEFAULT: True

Source code in src/async_task_pipeline/base/stage.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def __init__(
    self,
    name: str,
    process_fn: Callable,
    input_queue: queue.Queue,
    output_queue: queue.Queue,
    enable_timing: bool = True,
    return_exceptions: bool = False,
):
    self.name = name
    self.process_fn = safe(process_fn)
    self.input_queue = input_queue
    self.output_queue = output_queue
    self.thread: threading.Thread | None = None
    self.running = False
    self.processed_count = 0
    self.total_processing_time = 0.0
    self.enable_timing = enable_timing
    self.return_exceptions = return_exceptions

name instance-attribute

name = name

process_fn instance-attribute

process_fn = safe(process_fn)

input_queue instance-attribute

input_queue = input_queue

output_queue instance-attribute

output_queue = output_queue

thread instance-attribute

thread = None

running instance-attribute

running = False

processed_count instance-attribute

processed_count = 0

total_processing_time instance-attribute

total_processing_time = 0.0

enable_timing instance-attribute

enable_timing = enable_timing

return_exceptions instance-attribute

return_exceptions = return_exceptions

start

start()

Start the worker thread for this stage.

Creates and starts a daemon thread that will continuously process items from the input queue until stopped.

Notes

The worker thread is marked as daemon so it won't prevent the program from exiting.

Source code in src/async_task_pipeline/base/stage.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def start(self) -> None:
    """Start the worker thread for this stage.

    Creates and starts a daemon thread that will continuously process
    items from the input queue until stopped.

    Notes
    -----
    The worker thread is marked as daemon so it won't prevent the
    program from exiting.
    """
    self.running = True
    self.thread = threading.Thread(target=self._worker, name=f"Stage-{self.name}")
    self.thread.daemon = True
    self.thread.start()
    logger.info(f"Started pipeline stage: {self.name}")

stop

stop()

Stop the worker thread.

Signals the worker thread to stop and waits for it to complete. Sends a sentinel value (None) to the input queue to wake up the worker if it's waiting.

Notes

This method blocks until the worker thread has fully stopped.

Source code in src/async_task_pipeline/base/stage.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def stop(self) -> None:
    """Stop the worker thread.

    Signals the worker thread to stop and waits for it to complete.
    Sends a sentinel value (None) to the input queue to wake up the
    worker if it's waiting.

    Notes
    -----
    This method blocks until the worker thread has fully stopped.
    """
    self.running = False
    self.input_queue.put(EndSignal())
    if self.thread:
        self.thread.join()
    logger.info(f"Stopped pipeline stage: {self.name}")

get_average_processing_time

get_average_processing_time()

Get average processing time for this stage.

Calculates the average time spent in the processing function across all processed items.

RETURNS DESCRIPTION
float

Average processing time in seconds, or 0.0 if timing is disabled or no items have been processed.

Source code in src/async_task_pipeline/base/stage.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def get_average_processing_time(self) -> float:
    """Get average processing time for this stage.

    Calculates the average time spent in the processing function
    across all processed items.

    Returns
    -------
    float
        Average processing time in seconds, or 0.0 if timing is disabled
        or no items have been processed.
    """
    if not self.enable_timing:
        return 0.0

    if self.processed_count > 0:
        return self.total_processing_time / self.processed_count
    return 0.0

clear_input_queue

clear_input_queue()

Clear the input queue

Source code in src/async_task_pipeline/base/stage.py
187
188
189
190
def clear_input_queue(self) -> None:
    """Clear the input queue"""
    while not self.input_queue.empty():
        self.input_queue.get()