Pipecat 101 · sibling to the Python Concurrency series

One Loop, Twenty Milliseconds

How Pipecat moves real-time audio through a dozen AI services on a single asyncio event loop — and why every line of it is engineered around one rule: never block.

~16 min read Python 3.11+ · asyncio Assumes: you know what an event loop is

A voice agent is a soft real-time system wearing an AI costume. The model picking the next word is the part everyone talks about; the part that decides whether the product feels alive is whether a 20-millisecond chunk of microphone audio can cross the whole machine and come back as speech before the human notices the seam. Pipecat is the machinery that makes that crossing, and almost every interesting decision in it is a concurrency decision.

Here's the frame that the rest of this post hangs on. The transport hands you raw PCM in small chunks — at 16 kHz mono, a 20 ms chunk is 640 bytes, and it arrives fifty times a second, forever, for the length of the call. Those chunks have to flow through voice-activity detection, a speech-to-text service, a context aggregator, an LLM, a text-to-speech service, and back out to the speaker. Six network services and a neural net, chained, with a deadline measured against human perception. And it all runs on one asyncio event loop.

That last clause is the whole story. One loop means cooperative scheduling: nothing is preempted, every coroutine runs until it chooses to await, and while it runs nothing else does. If any step in that chain blocks the loop for 100 ms — a synchronous HTTP call, a numpy operation on the main thread, a time.sleep someone left in — then for 100 ms no audio moves, no VAD fires, and the interruption the user just spoke goes unheard. The glitch is audible. Pipecat's architecture is, top to bottom, a set of answers to "how do we keep the loop free."

From Go

If you've built pipelines in Go, the mental model is close: each stage is a goroutine, frames are values on channels, and push_frame is a channel send. The load-bearing difference is the scheduler. Go preempts — a goroutine stuck in a tight loop still yields because the runtime interrupts it. asyncio does not. A Pipecat processor that forgets to await is a goroutine the scheduler can never take the CPU back from. Cooperative scheduling is a contract, and the whole codebase is written to honour it.

§1The 20ms budget

Start with the clock, because the clock is what makes this hard. Conversational turn-taking breaks down somewhere north of ~300 ms of latency between you finishing a sentence and the bot starting its reply — past that, people start talking over each other the way they do on a bad phone line. That 300 ms is the budget for the entire round trip: end-of-speech detection, the final STT tokens, the LLM's first token, the first TTS audio, network on both ends. Every component is fighting for a slice of it.

Which means the scheduler's job is not "go fast." It's "never go away." A component that's fast on average but occasionally stalls the loop for 80 ms is worse than one that's uniformly a bit slower, because the stall lands on whatever audio chunk was mid-flight and you hear it. The enemy is not latency; it's latency variance injected into the event loop. Hold that distinction — it explains why Pipecat reaches for threads in the three specific places it does and nowhere else.

§2Frames are messages

Nothing in a Pipecat pipeline calls anything else directly. Everything is a Frame pushed from one processor to the next. The base class is deliberately thin — it's a dataclass that stamps every instance with an id, a name, and a presentation timestamp:

class Frame:
    id: int = field(init=False)
    name: str = field(init=False)
    pts: Optional[int] = field(init=False)        # presentation timestamp, nanoseconds
    metadata: Dict[str, Any] = field(init=False)

    def __post_init__(self):
        self.id = obj_id()
        self.name = f"{self.__class__.__name__}#{obj_count(self)}"
        self.pts = None
        self.metadata = {}

pipecat-core/src/pipecat/frames/frames.py — the base Frame. Audio, transcripts, LLM tokens, lifecycle signals: all of them subclass this.

The subclasses are where the interesting taxonomy lives, and it's not organised by content — it's organised by scheduling priority. There are three branches:

CategoryExamplesScheduling meaning
SystemFrameInputAudioRawFrame, StartInterruptionFrame, CancelFrame, StartFrame, EndFrameOut-of-band. Jumps the queue. Processed even while normal frames are paused.
DataFrameOutputAudioRawFrame, TextFrame, LLMTextFrame, TTSAudioRawFrameThe ordered payload. FIFO. Interruptible — dropped when the user barges in.
ControlFrameEndFrame, LLMFullResponseStartFrame, TTSStartedFrameOrdered like data, but signals state transitions rather than carrying payload.

Why does a base class carry a notion of priority at all? Because of the central tension in any streaming system: most frames want strict ordering — you cannot play TTS audio out of sequence — but a few frames are about ordering and must violate it. "The user just interrupted you" is useless if it waits politely behind the three seconds of bot speech already queued up. The SystemFrame / DataFrame split is how Pipecat lets one channel carry both the payload and the control signals that need to overtake it. The mechanism that enforces it lives one layer down.

§3A processor is two tasks and a queue

The FrameProcessor is the atom of the system — every STT service, every aggregator, every transport is one. And the thing worth internalising is that a processor is not a function you call. It's a small concurrent machine: two asyncio tasks reading from two queues, wired so that out-of-band signals can overtake the payload stream.

Here is the state set up in __init__, stripped to the load-bearing fields:

# Input side: a PRIORITY queue. System frames sort ahead of everything.
self.__input_queue = FrameProcessorQueue()
self.__input_event: Optional[asyncio.Event] = None
self.__input_frame_task: Optional[asyncio.Task] = None

# Process side: a plain FIFO queue for ordered data/control frames.
self.__process_queue = asyncio.Queue()
self.__process_event: Optional[asyncio.Event] = None
self.__process_frame_task: Optional[asyncio.Task] = None

pipecat-core/src/pipecat/processors/frame_processor.py — two queues, two events, two tasks. The double-underscore name-mangling is deliberate: subclasses must not touch these.

The input queue isn't an ordinary asyncio.Queue. It's a FrameProcessorQueue, a subclass of PriorityQueue with exactly two tiers and a monotonic counter to keep insertion order within a tier:

class FrameProcessorQueue(asyncio.PriorityQueue):
    HIGH_PRIORITY = 1
    LOW_PRIORITY  = 2

    async def put(self, item):
        frame, _, _ = item
        if isinstance(frame, SystemFrame):
            self.__high_counter += 1
            await super().put((self.HIGH_PRIORITY, self.__high_counter, item))
        else:
            self.__low_counter += 1
            await super().put((self.LOW_PRIORITY, self.__low_counter, item))

A SystemFrame always sorts ahead of a DataFrame, regardless of arrival time. Within a tier, the counter preserves FIFO. This is the queue-jump from §2, made concrete.

Now the two tasks. The input task pulls from the priority queue. If the frame is a system frame, it handles it right there. If it's anything else, it forwards it to the FIFO process queue and moves on:

async def __input_frame_task_handler(self):
    while True:
        (frame, direction, callback) = await self.__input_queue.get()

        if self.__should_block_system_frames and self.__input_event:
            await self.__input_event.wait()        # yields the loop; never spins
            self.__input_event.clear()
            self.__should_block_system_frames = False

        if isinstance(frame, SystemFrame):
            await self.__process_frame(frame, direction, callback)   # handle now
        elif self.__process_queue:
            await self.__process_queue.put((frame, direction, callback))  # defer

        self.__input_queue.task_done()

The input task is the fast lane. A system frame never waits behind buffered audio — it's processed the moment it's dequeued.

The process task is the slow lane — it drains the FIFO queue and runs the actual work (the call out to the LLM, the TTS synthesis) for each ordered frame. Crucially it tracks the frame currently in flight, because that's the one an interruption will need to cancel:

async def __process_frame_task_handler(self):
    while True:
        self.__process_current_frame = None
        (frame, direction, callback) = await self.__process_queue.get()
        self.__process_current_frame = frame                  # remember what's in flight

        if self.__should_block_frames and self.__process_event:
            await self.__process_event.wait()
            self.__process_event.clear()

        await self.__process_frame(frame, direction, callback)
        self.__process_queue.task_done()

Two tasks per processor, and a pipeline of ~10 processors is ~20 tasks — all on the one loop. None of them ever blocks; every wait above is an `await` on a queue or an Event.

Anatomy of one processor
Frames enter the priority queue. System frames are handled inline by the input task; everything else is forwarded to the FIFO queue and the process task. Two tasks, two queues, one direction of flow shown.
frames in priority queue sys ▸ data input task triage SystemFrame → handled now data / control FIFO queue process task the real work push_frame → next processor

Why two tasks instead of one? Collapse them and you get a single loop that pulls a frame, does the work, pulls the next. Now an interruption frame arriving mid-synthesis sits in the queue behind the audio the synthesis is producing — it can't be seen until the current work finishes, which is exactly the latency you were trying to kill. Splitting "notice the signal" (input task) from "do the work" (process task) is what lets a control frame land while the slow lane is busy. The price is two tasks per processor and the discipline of never sharing mutable state between them without an asyncio.Event to coordinate — which is precisely what those __input_event / __process_event fields are for.

§4The pipeline is a linked list of tasks

A Pipeline is itself a FrameProcessor — a composite. It takes the list of processors you hand it and links them into a doubly-linked chain:

def _link_processors(self):
    prev = self._processors[0]
    for curr in self._processors[1:]:
        prev.link(curr)
        prev = curr

def link(self, processor):
    self._next = processor
    processor._prev = self

Each processor knows its `_next` and `_prev`. That's the whole topology — a chain.

Frames travel in a FrameDirection. DOWNSTREAM follows _next (audio in → speech out); UPSTREAM follows _prev and is how a deep processor sends a signal back toward the source — an error, or the all-important "the user started talking, everyone upstream of me needs to know." push_frame is the one verb every processor uses, and it does nothing but enqueue onto the neighbour:

async def __internal_push_frame(self, frame, direction):
    if direction == FrameDirection.DOWNSTREAM and self._next:
        await self._next.queue_frame(frame, direction)
    elif direction == FrameDirection.UPSTREAM and self._prev:
        await self._prev.queue_frame(frame, direction)

`queue_frame` is `put` onto the neighbour's priority queue. So a frame's trip through the pipeline is N queue hops across N pairs of tasks — not N function calls on one stack.

This is the part that rewires your intuition. A frame's journey from the microphone to the model is not a call stack. There is no single coroutine that holds "the audio chunk" from top to bottom. Each hop is a hand-off: processor A's process task awaits a put onto processor B's input queue, then loops back for its next frame; B's input task picks it up on its own schedule. The audio chunk is a baton, and the runtime is free to interleave fifty other batons between any two hops. That's what keeps the loop responsive — and it's also why you can never reason about a Pipecat bug by reading a stack trace. The causal chain lives in the queues, not the stack.

One chunk's journey
Follow a single 20ms audio chunk through a cascade pipeline. Each step is a queue hand-off to a different pair of tasks — and a chance for the loop to run something else.
A Pipecat bug never lives on the stack. The causal chain is in the queues — which is why a stack trace lies to you about what happened.

§5The cardinal sin, and the escape hatch

Now the rule the whole post has been circling. Nothing in the hot path may block the event loop. Every process_frame is async def; every wait is an await on a queue or an Event; timing is asyncio.sleep, never time.sleep. The output transport's clock task, which holds each audio frame until its presentation timestamp arrives, sleeps the difference asynchronously so the loop stays free the entire time it waits.

But some work genuinely can't be made async, and pretending otherwise is how you get the 80 ms stall. Voice-activity detection runs a neural net on every audio chunk. That's CPU-bound, synchronous, and runs fifty times a second. Run it inline and you've serialised the whole loop behind a model. Pipecat's answer is a thread — and exactly one thread per analyzer:

# One thread per analyzer: a single audio stream is processed in order,
# so a one-worker pool gives offload without reordering.
self._executor = ThreadPoolExecutor(max_workers=1)

async def analyze_audio(self, buffer: bytes) -> VADState:
    loop = asyncio.get_running_loop()
    state = await loop.run_in_executor(self._executor, self._run_analyzer, buffer)
    return state

pipecat-core/src/pipecat/audio/vad/vad_analyzer.py — the inference runs on a worker thread; the coroutine awaits its result without blocking the loop.

This is where your knowledge of the GIL stops being trivia and starts being load-bearing. Offloading pure-Python CPU work to a thread buys you almost nothing — the GIL serialises it against the loop anyway, and you've added thread-switch overhead for the privilege. It works here because the inference call drops into ONNX/numpy/Torch, which release the GIL around their native compute. While the worker thread is down in C doing matrix multiplies, the main thread holds the GIL and the event loop keeps moving audio. The thread offload is only correct because the workload releases the lock; the same pattern around a tight pure-Python loop would be a quiet performance regression.

The pattern, and its three homes

Pipecat reaches for threads in precisely the places where the work is synchronous and releases the GIL: VAD inference and smart-turn detection (run_in_executor with a one-worker pool each), and image resizing in the output transport (PIL's resize, also ThreadPoolExecutor(max_workers=1)). Image encode/decode for vision frames uses asyncio.to_thread. Everywhere else — the LLM call, the TTS stream, the transport socket — the work is I/O and stays as plain coroutines. If you find yourself adding a thread to a Pipecat processor, ask first whether the work releases the GIL. If it doesn't, the thread is a lie.

The max_workers=1 is not timidity, it's correctness. One analyzer handles one audio stream, and that stream's chunks must be processed in order. A pool of N workers would let chunk 7's inference finish before chunk 5's, scrambling the VAD state machine. The single worker gives you the offload without giving up ordering — a thread pool used as a serial executor, which is a sharper tool than it first looks.

§6Interruptions, where the design pays rent

Everything above — the two tasks, the priority queue, the system-frame fast lane — exists for this section. Barge-in is the feature that justifies the architecture.

The bot is three seconds into a paragraph. The user says "no, wait." Here's what has to happen, and fast: VAD on the input transport detects speech and the interruption strategy decides it's a real barge-in, not a cough. A StartInterruptionFrame — a system frame — is pushed. Because it's a system frame, at every processor it lands in the high-priority tier of the input queue and is handled by the input task immediately, overtaking whatever TTS audio is buffered in the FIFO lane. When a processor sees it, it tears down the in-flight work:

async def _start_interruption(self):
    if isinstance(self.__process_current_frame, UninterruptibleFrame):
        self.__reset_process_queue()        # keep the in-flight frame; drop the rest
    else:
        await self.__cancel_process_task()       # cancel mid-synthesis...
        self.__create_process_task()           # ...and start a fresh, empty one

Cancelling the process task kills the in-flight TTS synthesis or LLM stream. A brand-new process task replaces it, draining a now-empty queue.

The queue reset is the subtle bit. You can't just clear() the FIFO — some frames must survive an interruption. A FunctionCallResultFrame, for instance, carries the result of a tool call that already ran; drop it and the LLM's context is now inconsistent with reality. So those frames are marked UninterruptibleFrame, and the reset filters rather than flushes:

def __reset_process_queue(self):
    new_queue = asyncio.Queue()
    while not self.__process_queue.empty():
        item = self.__process_queue.get_nowait()
        if isinstance(item, UninterruptibleFrame):
            new_queue.put_nowait(item)        # survivors go back
        self.__process_queue.task_done()
    # swap the surviving frames back in, dropping everything else

Interruption drops the bot's queued speech but preserves context-critical frames. The conversation stays consistent even though the audio stopped mid-word.

Trace the timing once more and the design clicks. The whole reason the interruption frame can overtake three seconds of buffered audio is that it rode the high-priority tier into the input task, which never queued behind the process task's backlog. If interruptions and audio shared one FIFO, "stop talking" would arrive three seconds late — which is to say, after the bot finished anyway. The two-task split isn't an abstraction tax; it's the only reason barge-in feels instant.

§7Cancellation is the hard part

A voice session is a long-lived program — it can run for an hour — and a long-lived program lives or dies on its teardown. The processors above spawn tasks freely: an input task, a process task, a receive task in the TTS service, a keepalive task on the websocket. When the call ends, all of them must stop, in order, without leaking. Pipecat centralises this in a TaskManager, and its create_task wrapper encodes two hard-won lessons:

async def run_coroutine():
    try:
        await coroutine
    except asyncio.CancelledError:
        logger.trace(f"{name}: task cancelled")
        raise                              # MUST re-raise, or the cancel is swallowed
    except Exception as e:
        tb = traceback.extract_tb(e.__traceback__)
        logger.error(f"{name} unexpected exception ...")   # a crashed task logs, doesn't vanish

task = self._params.loop.create_task(run_coroutine())
task.set_name(name)
task.add_done_callback(self._task_done_handler)

pipecat-core/src/pipecat/utils/asyncio/task_manager.py — every task is named and tracked. Re-raising CancelledError is non-negotiable; the bare `except Exception` ensures a task that dies on a real error leaves a log line instead of disappearing.

The first lesson — re-raise CancelledError — is the one everyone learns the hard way. asyncio cancels a task by raising CancelledError at its next await. If your handler catches it and doesn't re-raise, the runtime believes the cancellation failed, the task keeps running, and await task on the canceller's side hangs forever. The second lesson is the bare-except: an asyncio task that raises and is never awaited will swallow the exception silently, and you'll discover the dead processor only by its absence. Naming every task and routing it through one wrapper means a crash is a log line, not a mystery.

The cancel path mirrors this with a timeout, because a well-behaved system still has to defend against a misbehaving dependency:

async def cancel_task(self, task, timeout=None):
    task.cancel()
    try:
        await asyncio.wait_for(task, timeout=timeout) if timeout else await task
    except asyncio.TimeoutError:
        logger.warning(f"{name}: timed out waiting for task to cancel")
    except asyncio.CancelledError:
        pass                               # cancelled cleanly

If a third-party library swallows the cancel and refuses to die, the timeout keeps teardown from hanging the whole session. You log it and move on.

This will bite you

The leak you'll actually hit is not in Pipecat core — it's in a service that spawns its own background tasks and ties their lifetime to a _disconnecting flag rather than to cancellation. ElevenLabs TTS is the canonical example: it runs a _receive_task and a _keepalive_task on a websocket. If the pipeline exits without explicitly cancelling the task tree, those two keep the socket open and the TTFB timers ticking — forever. The fix is mechanical (cancel the pipeline task in a finally, unconditionally) but invisible until you're staring at a slowly climbing connection count. The Pantheon voice module hit exactly this; the companion post traces the fix.

This is the seam between Pipecat-the-engine and Pipecat-in-production. The engine gives you clean cancellation primitives; whether your session leaks comes down to whether your wrapper drives them on every exit path — normal end, timeout, exception, and outside cancellation alike. That wrapper is the entire subject of the next post.

§8Take with you

Where to go next

Pipecat is the engine. Pantheon's platform/voice is the chassis: a Temporal activity that owns one event loop for up to an hour, bridges WebRTC and a Redis pub/sub audio path into a pipeline, and answers the four questions the engine leaves open — who ends a session, who cancels the task tree, who cleans up the websockets, and how a background agent injects a frame into a live conversation. That's the companion piece.