Processor
The Processor is the fundamental unit of work in the GenAI Processors library. It encapsulates logic, AI models, and tools into a composable, asynchronous interface.
For a runnable introduction to the concepts described here, check out the Processor Introduction Notebook.
The Core Concept
At its heart, a Processor is a transformation pipeline: it takes an input stream of ProcessorParts data and produces an output stream of parts. These parts represent different modalities —such as text, images, or files— allowing the pipeline to handle complex, multi-modal data.
To make development easier, the library handles the heavy lifting of unifying
data types for you. When receiving a stream, you'll work with a
ProcessorStream class (inheriting from ContentStream); this acts as an
AsyncIterable, but also provides convenient accessors like .text() for quick
extraction. When it’s time to send data back, you can simply yield whatever you
have —whether that’s a raw string or a part— and the library will automatically
normalize it into processor parts, filling in default values for any arguments
you didn't provide. This reliance on a standard AsyncIterable interface is
what makes it so simple to chain multiple Processors into a single, continuous
pipeline.
The Processor Interface
Processors provide the following interface:
def __call__(
self, content: AsyncIterable[ProcessorPartTypes] | ProcessorContentTypes
) -> ProcessorStream:
...
and there are two ways to define them:
Functional Definition
For simple logic that does not maintain a state between processor calls, you can
use the @processor.processor_function decorator.
from genai_processors import processor, content_api
from typing import AsyncIterable
@processor.processor_function
async def simple_filter(
content: content_api.ProcessorStream,
) -> AsyncIterable[content_api.ProcessorPartTypes]:
"""Passes through only text parts buffering them first."""
text_buffer = []
async for part in content:
if content_api.is_text(part.mimetype):
text_buffer.append(part.text)
yield "".join(text_buffer)
You’ll notice that a Processor’s implementation signature differs slightly from
its usage signature. This is intentional. When implementing a processor, you get
the full power of the ProcessorStream input and the flexibility to return any
valid ProcessorPartTypes. When calling a processor, the library allows you to
pass in various data types as input while ensuring you always receive a
ProcessorStream back, giving you a consistent, feature-rich output to work
with.
Class-Based Definition
For logic requiring internal state (like API clients), you need to subclass
processor.Processor to keep the state as a class field.
class MyStatefulProcessor(processor.Processor):
def __init__(self, api_client: Any):
self._api_client = api_client
self._api_client.init()
async def call(
self, content: content_api.ProcessorStream
) -> AsyncIterable[content_api.ProcessorPartTypes]:
request_str = await content_api.ContentStream(parts=content).text()
response = await self._api_client.request(request_str)
yield response
The PartProcessor Interface
Parts in a stream can often be processed independently —for example, image preprocessing or "verbalizing" PDFs into text and images. In these cases, we can apply a critical optimization: not only can these parts be processed in parallel, but we can also avoid head-of-line blocking when multiple processors are chained into a pipeline.
To simplify this, we provide a specialized PartProcessor class designed to
handle a single part. The library automatically parallelizes the incoming stream
across a stack of these processors and ensures the results are reassembled in
the correct order for the next stage of the pipeline.
Why use PartProcessor?
-
Simplicity: You write logic for one part, not a loop over a stream.
-
Performance: The library automatically runs
PartProcessorlogic concurrently on incoming parts.
@processor.part_processor_function
async def shouter(
part: content_api.ProcessorPart
) -> AsyncIterable[content_api.ProcessorPartTypes]:
if content_api.is_text(part.mimetype):
yield part.text.upper()
else:
yield part
Filtering with match_fn
You can attach a match function to explicitly define which parts this processor handles. This improves readability and performance and is recommended.
def is_text_part(part: content_api.ProcessorPart) -> bool:
return content_api.is_text(part.mimetype)
@processor.part_processor_function(match_fn=is_text_part)
async def shouter(
part: content_api.ProcessorPart
) -> AsyncIterable[content_api.ProcessorPartTypes]:
# We are guaranteed that 'part' is text because of match_fn
yield part.text.upper()
Comparison: Processor vs. PartProcessor
| Feature | Processor | PartProcessor |
|---|---|---|
| Input | Stream (AsyncIterable) | Single Item (ProcessorPart) |
| State | Can maintain state across the stream | Stateless per-part |
| Concurrency | Sequential (unless manually managed) | Automatic (Parallelized DFS map over a chain of PartProcessors) |
| Use Case | Buffers, Accumulators, Full Context Models | Filters, Formatters, Independent Transformations |
Composition
The power of this library lies in composing simple processors into complex pipelines.
Sequential Chaining (+)
The + operator pipes the output of one processor into the input of the next.
It takes care of concurrency under the hood and is the recommended way of
chaining Processors or PartProcessors together.
Parallel Execution
You can branch execution to run multiple processors simultaneously.
Parallel Parts (//)
Use // to run multiple PartProcessors on the same input part concurrently. The
results are concatenated (preserving input order).
# For each input part, run a classifier AND a logger for that part.
classifier_and_logger = classifier_part_processor // logger_part_processor
Parallel Streams (parallel_concat)
For full Processors operating on streams, use parallel_concat. This
broadcasts the stream to multiple agents and merges their outputs.
# Both agents receive the full stream context
mixture = processor.parallel_concat([agent_a, agent_b])
When iterating on mixture, both agents are processed in parallel but the
iterator will progress first on the result of agent_a. If agent_b comes
first, you will not be able to iterate on it before agent_a is done.
To merge the output of processors as they come, use the merge method from the
streams module on the output of each processor. See
Async & Streaming section for more details.
Orchestration (Routing)
For advanced flows, you may need to route data dynamically based on its content.
Switch (Stream Routing)
Switch dispatches parts of the input stream to different processors based on
conditions. The condition in each case statement defines the Parts that will be
sent to the attached processor. The order of the parts in the output and input
streams is only kept for parts returned by the same processor, i.e. two parts
matching two different cases are not guaranteed to be in the same order in the
input and output stream.
from genai_processors import switch
# Route audio to audio_model, everything else to text_model
router = (
switch.Switch(content_api.get_mimetype)
.case(content_api.is_audio, audio_model)
.default(text_model)
)
PartSwitch (Part Routing)
PartSwitch provides the same part-level dispatching logic as a standard
Switch, but it is implemented as a PartProcessor. It directs individual
parts to different handlers and executes them concurrently. This is particularly
useful in a PartProcessor stack, ensuring that different modalities —like a
mix of heavy images and light text— are processed simultaneously so that one
slow part doesn't bottleneck the rest of the stream.
# Process text tokens with one logic, images with another
part_router = (
switch.PartSwitch()
.case(content_api.is_text, text_handler)
.case(content_api.is_image, image_handler)
.default(processor.passthrough())
)
Sources
Processors can ingest data from any AsyncIterator, but for convenience, we provide built-in sources for common inputs like microphones, cameras, screen captures, and terminals.
Because mixing these inputs is a frequent requirement, we’ve designed Sources to
implement the Processor interface. This allows you to combine them using the +
operator —for example: TerminalInput('>') + audio_io.AudioIn(...) +
live_model.LiveModel(...). Most sources, especially those tailored for
real-time use, will stay active as long as their input stream is open. If you
need a pipeline to run indefinitely, you can use streams.endless_stream() to
keep the sources alive until they are explicitly cancelled or ended.
If you do not find what you need with built-in Source, you can define your own
source (e.g., from a microphone, a queue, or a file) using the
@processor.source decorator:
@processor.source
async def my_source(filepath: str) -> AsyncIterable[content_api.ProcessorPart]:
data = await read_file(filepath)
yield content_api.ProcessorPart(data)
# Usage:
# p = file_source("data.txt") + simple_filter + model
# async for part in p(streams.endless_stream()):
# ...
Context & Error Propagation
When a processor in a chain fails, it is vital to terminate all concurrently
running tasks and notify the source that data processing has stopped. To achieve
this, pipelines should always be executed within an async with
processor.context(): block. This context defines a TaskGroup, ensuring that if
one part of the pipeline fails, the entire chain is cleaned up and no background
tasks are left "hanging" as zombies.
async with processor.context():
# Run the pipeline
async for part in my_processor_pipeline(input_stream):
process_result(part)
Yielding exceptions as parts
For complex pipelines, terminating them when something goes wrong might not be
an option: there are too many cogs to expect all of them working flawlessly.
Instead, we may want to let an LLM deal with the problem, autocorrect, or try a
different approach. In this case, wrap a processor with the
@processor.yield_exceptions_as_parts decorator. This will catch any exception
inside the processor and yield it as a ProcessorPart to the output stream.
import random
from typing import AsyncIterable
from genai_processors import content_api
from genai_processors import mime_types
from genai_processors import processor
@processor.processor_function
@processor.yield_exceptions_as_parts
async def faulty_processor(
content: content_api.ProcessorStream,
) -> AsyncIterable[content_api.ProcessorPartTypes]:
"""This processor will sometimes fail."""
if random.random() < 0.5:
raise ValueError("Something went wrong")
yield "Hello"
@processor.processor_function
async def recover_from_error(
content: content_api.ProcessorStream,
) -> AsyncIterable[content_api.ProcessorPartTypes]:
"""This processor will recover from an error."""
async for part in content:
if part.mimetype == mime_types.TEXT_EXCEPTION:
yield f"Processor failed with: {part.text}, trying something else."
else:
yield part
With @yield_exceptions_as_parts, faulty_processor will not terminate the
pipeline when it fails. Instead, it will output a ProcessorPart with mimetype
text/x-exception that recover_from_error processor can use to react to the
failure.
Background Tasks
If you need to perform a "fire-and-forget" operation (like saving to a DB)
without blocking the main generation flow, use processor.create_task().
async def process_with_background():
async with processor.context():
# Schedule the background task (fire and forget)
# This is tracked by the context and cleaned up automatically
processor.create_task(save_to_db(results))
# Yield generation results immediately
async for part in model(input_stream):
yield part
# When exiting the context, the task is awaited by the context.
How the Lifecycle is Managed
To keep your pipelines robust and memory-safe, the processor.context() follows
a few key principles:
-
Automatic Cleanup: You don't have to worry about manual teardown. When the context exits, all background tasks—including buffers and parallel branches —are gracefully cleaned up for you.
-
Smart Error Handling: If a processor encounters an exception, the context acts as a safety net. It immediately cancels any remaining tasks in the chain and propagates the error, preventing "zombie" processes from lingering in the background.
-
Integrated Task Management: To ensure your background work stays synced with the pipeline’s lifecycle, we recommend using
processor.create_task(). Unlike a standardasyncio.create_task(), this keeps your work attached to the processor’s context so it can be managed and cleaned up automatically.