Field report · sibling to Parts 1–5

The Peek That Always Returns False

Disconnect detection through five middleware layers — why the textbook fix lies, why the surgical fix races in principle, and what we'd write if scope didn't matter.

~14 min read Python 3.11+ Assumes: Parts 4 & 5

Alive-asyncio-task count on the API pod climbs from 500 to 2,200 over three hours and never comes back down. Nothing in the request log explains it. Nobody is opening tabs at three in the morning. The only thing the dashboard has to offer is a slope.

The graph is the kind of evidence that looks like a smoking gun until you try to point it at something. There is no error rate to anchor on, no latency cliff, no obvious correlate. The pod hits its task ceiling some time around hour six and starts OOMing on the executor queue. By then the leak is so large it dominates everything; the cause has been invisible for a quarter of a day.

This is the applied capstone of the series: Part 4 built anyio's deferred cancellation and the pre-armed peek from source, Part 5 built BaseHTTPMiddleware's task-group bridge. This post is where both conspire into one production bug. It is the postmortem of one such leak, and of the fix — three real lines of code with a paragraph of justification each. But the post is also about what didn't ship. The fix that landed is the smallest possible change. There are two other fixes one layer up and one layer further up still, each more principled than the last and each with a blast radius that disqualified it from a release-branch hotfix. The interesting work is in seeing why all three exist and what each one is paying for.

From Go

In a Go HTTP handler that streams, you select on <-r.Context().Done() against your work channel. The net/http server closes the context when the client disconnects — wired directly into the connection's read loop, no peek-vs-block dance. Python's equivalent does not come pre-wired. You install it. And the place you install it matters as much as the installation itself.

§1A leak in slow motion

The product is a Server-Sent Events stream. A user opens a chat tab; the browser issues a long-lived GET /streaming; the handler subscribes to a Redis stream and yields events as they arrive. When the tab closes, the TCP connection ends. The handler should notice and unsubscribe.

The handler is the kind of code that reads like nothing is wrong:

async def stream_events(stream_info):
    message_queue, cancel_func = stream_info
    try:
        yield ServerSentEvent(event="init-stream", data={"type": "init-stream"})
        while True:
            try:
                message = await asyncio.wait_for(message_queue.get(), timeout=1.0)
                if message:
                    yield ServerSentEvent(event="update", data=…, id=…)
            except asyncio.TimeoutError:
                pass
    except Exception as e:
        logger.error("Error in SSE stream", error=str(e))
    finally:
        cancel_func()

The SSE route handler, before the fix. Every iteration either delivers a message or sleeps for a second. Nothing in the loop body ever exits it.

Read the loop carefully. There is no way out. while True with a get() that either returns a message or times out — neither branch sets a flag, neither raises. The only way the function ever returns is if something outside it cancels the task running it. So: what is supposed to cancel it?

The framework's job, in theory. When the client disconnects, FastAPI is supposed to inject a CancelledError into the handler. In practice this happens in exactly two situations:

  1. The response generator yields, the framework tries to send, the send fails on a broken socket. The send wraps the failure and cancels the generator. This works fine on a busy stream — events are flowing, every yield is a chance to discover the closed connection. It fails on an idle stream: no events to yield, no sends to fail, no cancellation, ever.
  2. The application explicitly reads http.disconnect from the receive channel. Uvicorn enqueues that message the moment the TCP read returns zero. But uvicorn does not push it; the application must pull. The handler above never reads receive at all, so the message sits in the queue and the disconnect is invisible.

So an idle SSE client whose tab is closed creates a handler that will live until the process restarts. Across many tabs and a sufficiently quiet channel, the leak is linear in the number of disconnects — which is what we saw climbing on the dashboard.

§2The disconnect contract

The ASGI receive channel is a queue of dicts that look like this:

# Uvicorn pushes these into the per-request queue.
{"type": "http.request", "body": b"…", "more_body": False}
{"type": "http.disconnect"}                # on FIN/RST

The disconnect message is the only authoritative signal that the client is gone. There is no parallel out-of-band channel, no signal, no flag on the request object set by the server. The server pushes one dict into one queue and considers the contract honored. The application has to read it.

"The application has to read it" is the line every fix in this post is rephrasing. The reading is the entire job. The post is about who does the reading, where in the stack they do it, and what other code might be reading the same channel at the same time. The shape of the answer determines the shape of the fix.

Diagram A · the disconnect plumbing

Uvicorn pushes http.disconnect into the per-request receive queue. Five BaseHTTPMiddleware layers each wrap that channel in their own coroutine before it reaches the handler. Every layer is also a yield point.

stream_events handler awaits receive (somewhere) request context (BaseHTTP) auth (BaseHTTP) logging (BaseHTTP) request timing (BaseHTTP) signature check (BaseHTTP) ↪ yield ↪ yield ↪ yield ↪ yield ↪ yield uvicorn — receive queue { "type": "http.disconnect" } sits here receive() ↑

§3The textbook fix, and its CancelScope trick

The FastAPI documentation, the Starlette changelog, every blog post on long-lived endpoints — they all point to the same one-liner:

while True:
    if await request.is_disconnected():
        break

Reasonable. The check is meant to be cheap. It is supposed to peek at the receive queue: if there is a message ready, look at it; if not, return immediately. Don't block on something that may never arrive. The peek means you can call it once per loop iteration without paying for a real wait.

How does Starlette implement a non-blocking peek on top of a blocking receive()? It uses a trick I find genuinely lovely:

# starlette/requests.py — real source, 1.0.0.
async def is_disconnected(self) -> bool:
    if not self._is_disconnected:
        message: Message = {}

        # If message isn't immediately available, move on
        with anyio.CancelScope() as cs:
            cs.cancel()                  # pre-armed!
            message = await self._receive()

        if message.get("type") == "http.disconnect":
            self._is_disconnected = True
    return self._is_disconnected

Read it slowly. anyio.CancelScope() opens a scope. cs.cancel() on the very next line marks it as cancelled before any await runs. The scope is pre-armed: any await performed inside it will, at the next cancellation checkpoint, raise CancelledError. The scope then catches that cancellation and exits normally.

The intended behavior is straight out of a thought experiment about Python's async semantics. If self._receive() can return a message without yielding to the event loop — which happens when a message is already sitting in its queue and the coroutine pulls it synchronously and returns — then the pre-armed cancellation never gets a chance to fire. The peek succeeds. You get the message. If, on the other hand, self._receive() has to yield because the queue is empty, the first yield is also the first cancellation checkpoint, and the pre-arm fires immediately. The peek aborts. You get nothing, and you assume "no message ready."

Mental model

Pre-armed cancel scope = "return if you can do it without ever pausing; otherwise act as if I never asked." It is a beautiful way to write queue.peek() on top of an interface that only exposes queue.get(). It depends entirely on the assumption that _receive() can sometimes return synchronously.

In a stock ASGI application, that assumption holds. _receive() is uvicorn's queue.get, which is implemented as await asyncio.Queue.get(), which — when the queue is non-empty — pulls and returns without yielding. The peek is real.

And then we wrap it.

§4Why the trick disarms itself

The request path runs through five BaseHTTPMiddleware layers — signature verification, request timing, logging, auth, and request context. Each is a Starlette middleware, and BaseHTTPMiddleware does not hand its downstream the raw receive channel. It hands down a wrapper that races two things at once — the real receive, and a watch on whether the response has already been sent:

# starlette/middleware/base.py — the real receive_or_disconnect (1.0.0).
async def receive_or_disconnect() -> Message:
    if response_sent.is_set():
        return {"type": "http.disconnect"}

    async with anyio.create_task_group() as task_group:
        async def wrap(func):
            result = await func()
            task_group.cancel_scope.cancel()   # first one home cancels the other
            return result

        task_group.start_soon(wrap, response_sent.wait)  # background sibling
        message = await wrap(wrapped_receive)            # foreground

    if response_sent.is_set():
        return {"type": "http.disconnect"}
    return message

Every BaseHTTPMiddleware instance exposes this as the receive channel to its downstream — a coroutine that opens a fresh task group, with a sibling waiter, on every call.

This is a small piece of structured concurrency. start_soon schedules a background sibling blocked on response_sent.wait(); the foreground await wrap(wrapped_receive) waits on the real receive. Whichever finishes first calls task_group.cancel_scope.cancel() to tear the other down. The detail that kills our peek is the teardown. A task group cannot exit until its children are joined — and joining a still-running sibling means __aexit__ has to await. So receive_or_disconnect always suspends at least once before it returns, even when the underlying receive had a message ready: there is always a sibling to wind down.

That suspension is the checkpoint our pre-arm has been waiting for. Recall the mechanism from §3 — Part 4 derives it from the anyio source, and Part 5 decodes this exact receive_or_disconnect: cancel() does not cancel the running task on the spot — _deliver_cancellation skips the currently-running task and reschedules itself with loop.call_soon, so the cancellation only lands the next time the task suspends and control returns to the loop. In a stock app, await receive() on a ready queue returns without ever suspending, so that scheduled cancellation never gets a turn and the peek succeeds. Through BaseHTTPMiddleware, the task-group join forces a suspension no matter what — and the instant the host task suspends, the call_soon'd cancellation fires, CancelledError is raised into the join, the scope catches it, and is_disconnected() returns False. The real receive underneath was never consulted.

Diagram B · the peek that never reaches the queue

Pre-arm fires on the first suspension. receive_or_disconnect always suspends to join its background sibling — even when the receive was ready — and the join is where the scheduled cancellation lands.

is_disconnected CancelScope middleware._receive uvicorn queue with anyio.CancelScope() cs.cancel() ← pre-armed await self._receive() task group spawns a sibling waiter; __aexit__ must await the join — suspends here — CancelledError on first await return False never reached

I want to stress that nothing here is broken, in the sense of a bug. Starlette's is_disconnected() behaves exactly as designed. The BaseHTTPMiddleware wrapper behaves exactly as designed. The pre-armed cancel scope behaves exactly as designed. The composition is what fails — a peek-without-yielding only works when the thing below the peek can actually return without yielding. We bought that property and then unwittingly sold it back when we stacked five middleware layers between our handler and uvicorn.

This was verified directly. Logging was added to the handler:

disconnected = await request.is_disconnected()
logger.info("disconnect probe", disconnected=disconnected, in_queue=…)

Every iteration after a deliberate client disconnect: disconnected=False, http.disconnect sitting in the queue. Hundreds of times. The peek never reached the queue. (The logging was removed before the PR — it was a probe, not a fix.)

A peek-without-yielding is a contract with everything below it. We stacked five things in the way and the contract dissolved.

§5Block, don't peek

If a peek can be disarmed by a yield, the fix is to use something that cannot be disarmed by yielding — because yielding is exactly what it's expecting to do. A blocking await on receive runs to completion. It will get the next message. If that message is http.disconnect, we set a flag and exit.

It is structurally the same as Go's select against ctx.Done(): a parallel routine whose only job is to wait for the cancel signal, and a primary routine whose only job is to do the work and check the flag. The watcher task:

disconnected = asyncio.Event()

async def _watch_disconnect():
    try:
        while not disconnected.is_set():
            msg = await request._receive()
            if msg.get("type") == "http.disconnect":
                disconnected.set()
                return
    except OSError:
        disconnected.set()

watcher = asyncio.create_task(_watch_disconnect())

The watcher's await request._receive() happily blocks. The middleware chain yields, the task group opens, control returns to the loop, and the loop schedules the next ready callback. When uvicorn enqueues http.disconnect, the chain wakes up, the message bubbles all the way back through the layers, and the watcher sees it. Setting disconnected is the actionable signal.

The main loop changes by exactly one line:

while True:
    if disconnected.is_set():
        break
    try:
        message = await asyncio.wait_for(message_queue.get(), timeout=1.0)
        …
    except asyncio.TimeoutError:
        pass
Diagram C · two coroutines and an event

The watcher blocks on receive and trips a flag. The handler polls the flag once per iteration. The coupling between them is one asyncio.Event; no shared state, no race.

handler loop while True: if flag.is_set(): break msg = await q.get() yield event(msg) watcher task while True: m = await receive() if m["type"] == "http.disconnect": flag.set() return asyncio.Event poll set

The watcher catches only OSError — the transport-level family (BrokenPipeError, ConnectionResetError, ConnectionAbortedError). Anything else is a bug somewhere up the receive chain and must propagate; silently flagging "disconnected" on an arbitrary failure would tear down a still-connected client's subscription. Starlette's own StreamingResponse follows the same convention internally.

Trap

The watcher consumes from receive. That only works if we're the only reader. An asyncio.Queue is single-consumer by design — two readers means whoever wakes first eats the message and the other never sees it. A disconnect consumed by the wrong side is a disconnect we never noticed; a body chunk pulled out from under a body parser is corruption. The next section is whether there's a second reader on this channel.

§6But who owns receive?

This is the question a reviewer asked, and it is the one I think the whole story turns on. The watcher works if it is alone with the channel. If something else in the request lifecycle is also pulling from receive, the two readers split the message stream non-deterministically — a disconnect message consumed by the other side is a disconnect we never notice, and the leak is unfixed in some sessions and not others, which is worse than unfixed everywhere.

The plausible second reader is Starlette's own response machinery. EventSourceResponse inherits from StreamingResponse, and StreamingResponse.__call__ historically spawns a listen_for_disconnect task that reads receive while the response is being streamed. If that task ran here, it and the watcher would be racing on the same channel.

The honest way to find out is to read the source. Starlette's branch is sharp:

# starlette/responses.py — paraphrased.
spec_version = scope["asgi"].get("spec_version", "2.0")
if spec_version >= (2, 4):
    try:
        await self.stream_response(send)
    except OSError:
        raise ClientDisconnect()
else:
    async with anyio.create_task_group() as tg:
        tg.start_soon(self.listen_for_disconnect, receive)  # races us
        tg.start_soon(self.stream_response, send)

ASGI specification 2.4 (March 2022) introduced the convention that the server raises OSError from send() when the underlying transport is gone. Starlette adopted this in 0.27 — if the spec version is 2.4 or later, the response trusts the server to surface disconnects through send and stops spawning its own receive listener. If the spec version is lower, Starlette still spawns listen_for_disconnect, and that task would race our watcher. (Part 5 §6 walks this same branch from the middleware side.)

The server here is uvicorn 0.30.6, which advertises spec_version="2.4" (uvicorn/protocols/http/h11_impl.py:205, httptools_impl.py:217). Top branch. listen_for_disconnect is dead code in this process. To check the other potential consumer: FastAPI 0.135's new fastapi.sse.EventSourceResponse (introduced March 2026 in PR #15030) doesn't read receive either — the SSE branch in fastapi/routing.py uses an anyio memory object stream and a fail_after-based keepalive, never an ASGI await receive(). The new first-party SSE primitive doesn't ship with its own disconnect detection. Cancellation is, by design, still Starlette's responsibility.

So in this stack the watcher is the sole reader on the receive channel, and the race is theoretical. The reviewer's concern is the right principled worry; the answer is "no race here," but the answer depends on a version number that isn't a property of the fix's code.

Diagram D · the branch that makes the watcher safe

In Starlette's StreamingResponse.__call__, ASGI ≥ 2.4 picks the top path — only send is used; receive is never touched. Below 2.4, a listen_for_disconnect task is spawned on receive, which would compete with our watcher.

scope["asgi"]["spec_version"] >= (2, 4) ? YES stream_response(send) writes only never reads receive watcher is sole reader NO listen_for_disconnect spawned in task group — reads receive — races our watcher uvicorn 0.30.6 → "2.4" we take the top path. the bottom is dead code here.
Subtle

The watcher's safety here is a property of versions, not of the code in the fix. A future migration to daphne <3 or hypercorn <0.14 — or any future Starlette release that decides to spawn a receive listener again — turns this back into a real race overnight. Document the assumption in the docstring; future-you will not remember it.

The fix that lives at the response layer

If the question is "who owns receive?", the most honest answer is: whoever the ASGI runtime hands the channel to. Handlers receive it indirectly, through the request object. Response classes receive it directly, as a function parameter. That difference is structural — and it is exactly why an older /events endpoint in the same codebase has been doing the right thing for a long time without ever needing this fix.

That endpoint's custom SSE response class is its own ASGI callable:

class SSEResponse(Response):
    async def __call__(self, scope, receive, send) -> None:
        disconnected = asyncio.Event()

        async def watch_disconnect():
            while not disconnected.is_set():
                message = await receive()  # this receive, the one we own
                if message["type"] == "http.disconnect":
                    disconnected.set()
                    return

        disconnect_task = asyncio.create_task(watch_disconnect())
        ...

The receive in watch_disconnect is the parameter passed to __call__. It is, by construction, the only reference to that channel in this layer. The body of the response is the listener. Ownership is not something the code has to assert; it is a property of the function signature. When the ASGI runtime sends us receive, we are responsible for it for the lifetime of this call, and no one else has a handle to it.

That is the principled location for a disconnect watcher in an ASGI app. The handler-level watcher is what you write when you can't move to the response layer cheaply; the response-level watcher is what you write when you can. In a follow-up, consolidating the streaming endpoint onto that response-layer shape — or onto an equivalent that wraps EventSourceResponse — removes the receive-ownership ambiguity entirely, and removes the dependency on the ASGI 2.4 escape clause. Same fix; one layer down; structurally smaller surface for someone to break later.

The fix one layer further up

There's a third option, more invasive again: rewrite the BaseHTTPMiddleware layers as pure ASGI callables. That doesn't resolve the ownership question — it doesn't matter who reads receive if no one does — but it removes the load-bearing buffering bug that made the textbook is_disconnected() peek fail in §4, which is what put us in the position of needing any watcher at all.

Starlette's maintainers track this in Discussion #2160 and #1729, and explicitly recommend pure ASGI for middleware adjacent to streaming responses — both for the disconnect-propagation reasons that caused our leak and for a 2-5× throughput win they observe in benchmarks. The pattern is uglier in 30-line bursts and considerably simpler in aggregate; the cost is per-middleware care for ContextVars, request.state mutation, exception ordering, and structured-logging binding. Some layers convert easily; the ones that mutate request state are the work.

If all five were converted, two things change. is_disconnected()'s pre-armed-peek trick would start working again, because the middleware-induced yield disappears. And EventSourceResponse's own disconnect propagation — which, recall, relies on Starlette's send-raises-OSError path — would propagate cleanly without being swallowed by middleware-level buffering. The watcher becomes redundant. Whether you still want it as defense-in-depth is a judgment call.

Three fixes, ranked by blast radius

Fix Lives at Resolves Cost
handler watcher (the shipped fix) route handler the leak 40 lines; depends on ASGI ≥ 2.4
response-level watcher custom Response class the leak + receive ownership ambiguity consolidate the streaming endpoint onto the response-layer class; touches every caller
pure-ASGI middleware each middleware module the leak + the buffering bug + restores is_disconnected() rewrite remaining layers, per-layer ContextVar/auth/logging care

The shape of the fix tells you the shape of the regret. Handler-level watcher: ships today, fragile to runtime version. Response-level watcher: principled, single owner, requires a refactor every caller has to follow. Pure-ASGI middleware: most principled, restores the framework's documented behavior, biggest blast radius. We took the smallest one — and the post you're reading is the documentation that explains why the smallest one is also the one that will need revisiting first.

§7What it cost — and why one second

The reproduction is an end-to-end harness: five concurrent SSE clients, one chunk read each, abrupt connection drop, count of read_stream tasks via asyncio.all_tasks() at intervals. Before the fix, the tasks never go away. After:

Measurement Before the fix After the fix
tasks immediately after 5 disconnects55
tasks at t + 1 s50
tasks at t + 10 s5 (permanent)0
cleanup latency~1006 ms

The interesting number is 1006 ms. Not because that is how long disconnect detection takes — the watcher trips on disconnect within microseconds — but because of how the watcher couples to the rest of the loop. The handler checks the flag at the top of each iteration; iterations are paced by asyncio.wait_for(queue.get(), timeout=1.0). On an idle channel, the loop is sleeping inside wait_for when the watcher trips. The flag becomes True instantly, but nobody is reading it yet. The next iteration only happens when wait_for times out a second later.

You can drive the latency down to nothing by replacing the timeout-based poll with an asyncio.wait({queue.get(), event.wait()}, return_when=FIRST_COMPLETED) — wake on either a real message or a disconnect, whichever comes first. The shipped fix doesn't. The simplicity argument is straightforward: a second of leak time across a transient disconnect is dramatically less interesting than the leak we just stopped, and the implementation has one fewer thing that can race. Worth it.

§8What's still wrong

The handler now exits cleanly on disconnect, but the cleanup it triggers — cancel_func() — is a synchronous wrapper around a fire-and-forget asyncio.create_task(cancel_async()) inside the broadcaster. That has its own problem, structurally identical in shape to the watcher's: a task you create and don't hold a reference to is a candidate for the garbage collector, and a cleanup task that gets reaped mid-run leaves the subscription it was supposed to cancel still listening. An existing partial fix — a module-level set() that holds strong references until the cleanup completes — keeps it from biting in practice, but a real fix wants the broadcaster's subscribe to return an async cancel function and have callers await it. Bigger surface area, separate change.

There's also a second, narrower issue surfaced in follow-up review: if request._receive() raises a non-OSError exception, the watcher task finishes with the exception unhandled. If the main loop later closes for another reason (an upstream cancellation, a server shutdown), the await watcher in the cleanup re-raises that exception before cancel_func() runs, and the Redis subscription leaks. The fix is to wrap await watcher in a try/except/finally that always reaches cancel_func(). Worth landing.

And then the broader story, the one §6 alludes to. The remaining three BaseHTTPMiddleware layers are still there, still inserting yields on every receive call, still buffering streaming responses through anyio memory channels, still swallowing client disconnects on the response side. The pure-ASGI migration is the right next move, and it is the move that turns this watcher into either defense-in-depth or dead code. Either outcome is fine. The middleware story is the one that determines which.

§9Take with you

Read next
The pure-ASGI migration — five middleware layers, three remaining, and what changes when the textbook fix starts working again

The post you just read is one half of the story. The other half is what happens when the load-bearing buffering bug under all of this goes away. Converting BaseHTTPMiddleware layers to pure ASGI callables is the canonical Starlette move; it's also the move that turns this entire PR into defense-in-depth. Subject of the next field report.