An Hour on One Loop
Pipecat is the engine. A voice module is the chassis — a Temporal activity that holds one event loop open for half an hour, bridges WebRTC and Redis into a pipeline, and answers the question the engine leaves open: who ends the call, and who cleans up after it?
A client POSTs to /api/voice/join and gets a response in milliseconds: a room name, a token, a workflow id. By the time that JSON is serialised, a second process is already opening — a pipeline that will run, detached, for as long as the conversation lasts, up to a thirty-minute ceiling. The HTTP request is over; the session is just beginning. Everything interesting about the voice module is in the gap between those two facts.
If you read Pipecat 101, you know the engine: frames flowing through a chain of two-task processors on one event loop, never blocking. This post is about the parts Pipecat deliberately doesn't have an opinion on — who owns the loop, how audio gets in and out when there's no neat SDK for your transport, what ends the session, and how a session avoids leaking the dozen background tasks it spawned. These are the production questions, and the architecture below is one good answer: a thin stack of Temporal wrappers around a long-lived asyncio program. The same shape recurs whenever you put a stateful real-time workload behind a workflow engine, so the patterns are worth more than the specific code.
Think of the module as three layers. A controller handles the synchronous /join request and launches a workflow. A workflow is a one-line Temporal wrapper that starts a single long-running activity. The activity is where the real program lives: it builds a Pipecat pipeline, owns the event loop for the session's lifetime, and is the only place asyncio tasks and threads are allowed to exist. The whole post is really about that activity.
§1The shape of a join
Two transports, selected by a discriminated union on the request body. LiveKit is the direct path — a browser or SDK connects over WebRTC, and the module just mints room tokens. Recall is the indirect path — a headless browser that Recall.ai drives joins a Google Meet as a participant, and audio has to be bridged in and out through a channel that has nothing to do with WebRTC. We'll spend most of our time on Recall, because the indirect path is where the interesting plumbing lives.
Inside the pipeline, two topologies. The default is cascade — discrete services chained, exactly the shape from Pipecat 101:
Transport Input (LiveKit | Recall) → STT (deepgram | elevenlabs) → DelegationNotificationProcessor # a Redis Pub/Sub listener, §7 → LLM Context Aggregator (user) → LLM (litellm → shared router, or direct anthropic/openai/gemini) → TTS (elevenlabs) → Transport Output (LiveKit | Recall) → LLM Context Aggregator (assistant)
The cascade pipeline. Each arrow is a push_frame hop between processors; tool-calling (delegation) hangs off the LLM service.
The alternative is speech-to-speech: a single OpenAI Realtime service, audio in and audio out, no separate STT/LLM/TTS and — for now — no tool calling. Cascade trades latency for control: you can swap the STT vendor, inspect the transcript, register tools. S2S trades control for latency: one multimodal model with its own built-in turn detection. The module supports both behind one config flag, and almost everything below applies to either.
§2Why an activity, not a workflow
This is the decision that shapes everything, and it's a direct consequence of how Temporal works. A Temporal workflow must be deterministic — it's replayed from an event history to recover state, so it cannot do anything whose result varies between runs. No wall-clock reads, no random, no raw network I/O, and — critically — no spawning your own asyncio tasks or threads. A Pipecat pipeline is nothing but those forbidden things: it creates twenty tasks, opens websockets, sleeps on a clock. It cannot live in a workflow.
So the workflow is a one-line shim, and the pipeline runs in an activity, where arbitrary async code is allowed:
class RunVoicePipelineWorkflow: async def execute(self, input) -> RunVoicePipelineWorkflowOutput: return await workflow.execute_activity( run_voice_pipeline, input, start_to_close_timeout=timedelta(seconds=PIPELINE_MAX_DURATION_SECONDS), heartbeat_timeout=timedelta(seconds=PIPELINE_HEARTBEAT_TIMEOUT_SECONDS), retry_policy=RetryPolicy(maximum_attempts=1), # see the trap below )
The workflow exists only to launch the activity with the right timeouts and retry policy. All the logic is one layer down.
Look at maximum_attempts=1. Temporal's default is to retry a failed activity forever, which is the right instinct for an idempotent task and exactly wrong for a voice call. A crashed session can't be resumed — the audio is gone, the user has to reconnect. Retrying it just spawns a fresh pipeline that the disconnected client will never talk to: a zombie. The comment in the source is blunt about why this matters, and it's the hinge for the next section.
A BaseTransport is not Temporal-serialisable — you can't pass a live websocket through an activity boundary. So the transport is born inside the activity, not handed to it. This is why run_voice_pipeline calls TransportRegistry.create_transport(...) as its very first line, rather than receiving a transport as an argument. The activity is the boundary where "serialisable Temporal world" ends and "live stateful objects" begins.
§3The heartbeat that pipecat can starve
A long-running Temporal activity has to prove it's alive. It does that by heartbeating — calling activity.heartbeat() periodically — and if Temporal doesn't hear one within the heartbeat timeout, it declares the activity dead and (absent the retry cap above) reschedules it. The voice activity runs the heartbeat as its own asyncio task, alongside the pipeline:
async def run_voice_pipeline(input) -> RunVoicePipelineWorkflowOutput: transport = TransportRegistry.create_transport(input.transport_config, input.config) heartbeat_task = asyncio.create_task(_heartbeat_loop(input.room_name)) try: return await VoicePipelineService().run(input, transport) finally: heartbeat_task.cancel() try: await heartbeat_task except asyncio.CancelledError: pass
The activity body. The heartbeat loop and the pipeline are two tasks sharing one event loop. Note the finally: the heartbeat is always cancelled and awaited, never orphaned.
And here's where the two posts collide. The heartbeat loop is just await asyncio.sleep(15); activity.heartbeat(...) on repeat. It only fires if the event loop gives it time. Now recall the cardinal rule from Pipecat 101: never block the loop. If anything in the pipeline blocks — a synchronous call, a busy-loop, a CPU task that should have gone to a thread — the heartbeat task doesn't get scheduled. It misses its window. Temporal concludes the activity has died, kills it, and the loop blocking that started it all just produced a zombie session.
Blocking the event loop doesn't just glitch the audio. It starves the heartbeat, and Temporal kills the whole session for a crime it didn't commit.
This is exactly the failure the maximum_attempts=1 defends against. Picture the chain: a Pipecat busy-loop starves the heartbeat, the activity times out, Temporal retries it under the default policy, and each retry spawns another zombie that also busy-loops. The retry cap turns an unbounded zombie fountain into a single clean failure. The lesson generalises past voice: when you run an asyncio program under a liveness check, the liveness check is itself a coroutine competing for the loop, and blocking the loop kills your monitor before it kills your work.
§4Audio without an SDK: the Redis bridge
LiveKit hands Pipecat a transport that already speaks audio. Recall does not. Recall's headless browser joins the Meet and exposes raw audio over a pair of WebSocket connections — one inbound, one outbound — and there is no Pipecat transport for "PCM over a websocket I have to manage myself." So the module builds one, and it uses Redis pub/sub as the seam between the WebSocket handlers (which live in FastAPI routes) and the pipeline (which lives in the activity, possibly on a different worker entirely).
The inbound side is a custom FrameProcessor that, on StartFrame, subscribes to its Redis channel and spawns two tasks — a consumer and a watchdog:
async def _start(self) -> None: queue, cancel = await broadcaster.subscribe(channel) self._cancel_subscription = cancel self._consumer_task = asyncio.create_task(self._consume_audio(queue), ...) self._timeout_task = asyncio.create_task(self._no_audio_timeout(), ...) async def _consume_audio(self, queue: asyncio.Queue) -> None: try: while True: chunk: bytes = await queue.get() self._audio_received = True await self.push_frame( InputAudioRawFrame(audio=chunk, sample_rate=16000, num_channels=1) ) except asyncio.CancelledError: raise # re-raise — the lesson from Pipecat 101
The custom transport's start hook. The consumer turns Redis bytes into the same InputAudioRawFrame the LiveKit transport would emit. Downstream, the pipeline can't tell the difference.
That's the elegant part: once a chunk becomes an InputAudioRawFrame, the rest of the pipeline is identical regardless of transport. The bridge's whole job is to make Recall's exotic audio path produce the same frame LiveKit produces, then get out of the way. The outbound processor does the mirror — it takes OutputAudioRawFrames, wraps each in a WAV header (so the browser's Web Audio API can decode them natively), and publishes to the audio-out channel, where the outbound WebSocket relays them back to Recall's microphone.
§5Four ways a call dies
A request/response handler has one way to end: it returns. A long-lived session has many, and getting all of them to converge on a clean teardown is most of the engineering. A voice call ends when the user leaves, when nobody ever joins, when it hits the duration ceiling, or — for Recall — when a bot.done webhook fires. Each path has to reach the same place: an EndFrame in the pipeline, which propagates through every processor and unwinds the session.
Before the mechanism, predict it. Each card is an ending — what carries the session to its grave?
_no_audio_timeout task, started alongside the consumer, sleeps for the join timeout and then checks a flag. If _audio_received is still false, it pushes an EndFrame itself. The watchdog and the consumer are siblings; one delivers audio, the other delivers the deadline.asyncio.wait_for(runner.run(task), timeout=MAX); on TimeoutError it force-cancels. Temporal's start_to_close_timeout is the outer backstop if the activity itself wedges. Belt and suspenders, because a wedged session is worse than a dropped one.bot.done webhook — the meeting ended on Google's side, outside our process entirely._session_control_listener task is subscribed; on bot.done it calls task.queue_frame(EndFrame()). The external event becomes an internal frame.EndFrame. Every path — timeout, ceiling, webhook, hangup — converges on the same frame, so teardown has exactly one code path to be correct.Ending 03 is worth seeing in full, because it's a small masterclass in writing a loop that can't leak. The listener has to wait for an event that may never come, while also noticing if the pipeline finished for some other reason — and it must never outlive the session as an orphaned task:
async with asyncio.timeout(PIPELINE_MAX_DURATION_SECONDS): # hard ceiling on the whole loop while True: if task.has_finished(): break # pipeline ended elsewhere — don't linger try: raw = await asyncio.wait_for(queue.get(), timeout=2.0) except asyncio.TimeoutError: continue # no event in 2s — loop back, re-check has_finished() event = json.loads(raw) if event.get("event") == "bot.done": if not task.has_finished(): await task.queue_frame(EndFrame()) break
The session-control listener. The 2-second wait_for is the trick: a blocking queue.get() would never re-check has_finished(), so the listener would outlive a session that ended any other way.
The wait_for(..., timeout=2.0) is the move worth stealing. A bare await queue.get() blocks until a message arrives — but most sessions end without a bot.done at all (the user just leaves), so the message never comes and the listener hangs forever, a leaked task holding a Redis subscription. Wrapping the get in a 2-second timeout converts an indefinite block into a poll: every two seconds it wakes, re-checks task.has_finished(), and exits if the session died elsewhere. The outer asyncio.timeout() is the final guarantee that even a wedged listener dies at the duration ceiling. This is the same disconnect-detection problem the SSE field report dissects — block, don't peek, but bound the block so you stay responsive to the other ways out.
§6The leak in the finally block
Now the bug this whole architecture exists to prevent — and that a naïve implementation will hit anyway, because it lives one layer below Pipecat's clean cancellation. The fix is straightforward once you've seen it: the service's run loop wraps the runner and, in a finally, cancels the pipeline task unconditionally.
try: await asyncio.wait_for(runner.run(task), timeout=PIPELINE_MAX_DURATION_SECONDS) except asyncio.TimeoutError: await task.cancel(reason="max duration exceeded") finally: # Guarantee processor teardown regardless of how the runner exits # (normal EndFrame, timeout, unexpected exception, asyncio cancellation). # Without this, background tasks in services like ElevenLabsTTSService # (_receive_task, _keepalive_task) stay alive with _disconnecting=False, # leaking WebSocket connections and TTFB timers indefinitely. try: await task.cancel(reason="pipeline teardown") except Exception as e: logger.error("Error cancelling voice pipeline task", error=str(e))
The runner wrapper. The comment is the postmortem. task.cancel() is idempotent, so calling it in finally is safe even after a clean exit.
An EndFrame flows through the pipeline and politely shuts each processor down — if the processor ties its lifetime to that frame. But a TTS service like ElevenLabs spawns a _receive_task and a _keepalive_task on its websocket, and gates their shutdown on an internal _disconnecting flag that a clean EndFrame path sets — but a timeout or an exception path does not. Miss the explicit task.cancel() and those two tasks keep the socket open and the TTFB timer running after the session is logically over. You don't see it in one call. You see it three hours later as a connection count that only climbs. The fix is the unconditional finally above; the diagnosis is the hard part.
The general shape — and the reason this is the natural sequel to Pipecat 101's cancellation section — is that clean teardown on the happy path is not teardown. A long-lived session exits four ways (§5), and three of them are not the happy path. Anything that frees a resource must run on all of them, which in practice means a finally that drives cancellation idempotently, not an EndFrame handler that only fires when the session ends the way you imagined.
§7Injecting a frame from the outside
The last piece is the one that makes this more than a phone line: the voice agent can hand a hard task to a background agent and keep talking while it runs. The LLM calls a delegate_to_agent tool, which fires an A2A request to an autonomous agent workflow with a callback URL. That workflow may run for minutes. The voice session does not wait — it carries on. When the background work finishes, it POSTs to a callback endpoint, which publishes to a Redis channel keyed by workflow id. And sitting in the live pipeline is a processor whose entire job is to listen on that channel and splice the result back into the conversation:
async def _consume_callbacks(self) -> None: while True: raw_message = await self._callback_queue.get() callback = parse_voice_delegation_callback(raw_message) formatted = format_voice_delegation_callback(callback) await self.push_frame( LLMMessagesAppendFrame( messages=[{"role": "user", "content": formatted}], run_llm=True, # append the result AND trigger a fresh turn ) )
The delegation-notification processor. A background consumer task pushes a synthetic frame into the live pipeline, exactly as if the user had spoken.
This is where the frame abstraction earns its keep. The processor's consumer task isn't on the audio path at all — it's woken by a Redis message that could arrive at any moment — yet the only thing it does to influence the conversation is push_frame an LLMMessagesAppendFrame downstream. To the user aggregator and the LLM, an asynchronously-delivered tool result and a spoken sentence are the same kind of event: a frame in the queue. The run_llm=True flag tells the aggregator to not just append the result but trigger a new LLM turn, so the bot speaks up on its own — "okay, that report is ready" — without the user having said anything. A background event, injected into a real-time stream, through the one verb the whole engine is built on.
Notice the consumer is started on StartFrame and torn down on EndFrame/CancelFrame, with the same re-raise-CancelledError discipline as every other task in the module. By now that should read as the house style: every background task is born on a lifecycle frame, cancelled in an unsubscribe, and awaited so it can't outlive the session.
§8Take with you
- The pipeline runs in a Temporal activity, never a workflow — workflows must be deterministic, and a Pipecat pipeline is twenty tasks, live sockets, and clock sleeps. The activity is the boundary where serialisable Temporal state ends and stateful objects begin.
- The heartbeat is a coroutine competing for the same loop as the pipeline. Block the loop and you starve the heartbeat — Temporal kills the session for a crime the audio glitch committed.
maximum_attempts=1stops that from fountaining zombies. - The Recall transport bridges PCM over websockets through Redis pub/sub, and its only real job is to emit the same
InputAudioRawFrameLiveKit would. Past the edge, the pipeline is transport-agnostic. - A long-lived session ends four ways; teardown must converge all of them on one
EndFrame. Poll with a boundedwait_forinstead of blocking forever, so the loop stays responsive to the other exits. - Clean teardown on the happy path is not teardown. Free resources in a
finallythat cancels idempotently — or leak a websocket per session and discover it hours later as a rising connection count. - Delegation injects a background result into a live conversation via a single
push_frame. The frame abstraction makes an async callback and a spoken sentence indistinguishable to the LLM.
Every leak in this post is a cancellation story, and cancellation in Python async is subtler than it looks — deferred delivery, shielded scopes, the peek-versus-block question of who notices a dead connection. The anyio cancellation piece builds the primitive from source, and the SSE disconnect field report is the same bounded-poll pattern from §5, traced through five middleware layers to a real production leak.