Streaming SSE from Durable Endpoints Beta
Durable Endpoints can stream data back to clients in real-time using Server-Sent Events (SSE). This lets you stream AI inference tokens, progress updates, or any other data — while keeping the durability guarantees of durable steps.
Streaming works across multiple steps within a single endpoint invocation, and handles the transition from sync to async mode seamlessly. If a step fails and retries, any data streamed during that step is automatically rolled back on the client.
Streaming SSE from Durable Endpoints is currently only available in the TypeScript SDK.
When to use streaming
- AI inference — Stream LLM tokens to the browser as they're generated, so users see results immediately.
- Status updates — Send progress messages during long-running endpoint executions.
- Making existing streaming endpoints durable — Wrap your existing streaming HTTP endpoints with steps to add retry and observability at no cost to functionality.
Quick start
Server
Import stream from inngest alongside step, then use stream.push() or stream.pipe() inside your endpoint handler:
import Anthropic from "@anthropic-ai/sdk";
import { step, stream } from "inngest";
import { inngest } from "@/inngest";
export const GET = inngest.endpoint(async (req) => {
// Option A: push() with an SDK event callback
const text = await step.run("generate", async () => {
stream.push("Generating...\n");
const client = new Anthropic();
const response = client.messages.stream({
model: "claude-sonnet-4-20250514",
max_tokens: 512,
messages: [{ role: "user", content: "Write a haiku about durability." }],
});
response.on("text", (token) => stream.push(token));
return await response.finalText();
});
// Option B: pipe() — streams each chunk AND returns the collected text
await step.run("translate", async () => {
stream.push(`\nTranslating...\n`);
const client = new Anthropic();
const response = client.messages.stream({
model: "claude-sonnet-4-20250514",
max_tokens: 256,
messages: [{ role: "user", content: `Translate to French: ${text}` }],
});
return stream.pipe(async function* () {
for await (const event of response) {
if (
event.type === "content_block_delta" &&
event.delta.type === "text_delta"
) {
yield event.delta.text;
}
}
});
});
return Response.json("\nDone!");
});
Client
Use streamRun() from inngest/durable-endpoints to consume the stream. It connects to your endpoint, parses the SSE frames, follows any sync-to-async redirects, and manages rollback — all automatically.
Each stream.push() call or stream.pipe() yield on the server fires the onData callback on the client with that chunk. Using the server example above, the client would receive chunks in this order: "Generating...\n", then each LLM token from the generate step, then "\nTranslating...\n", then each LLM token from the translate step, and finally "\nDone!" as the result.
"use client";
import { useState } from "react";
import { streamRun } from "inngest/durable-endpoints";
export default function Generate() {
const [chunks, setChunks] = useState<string[]>([]);
async function run() {
setChunks([]);
await streamRun<string>("/api/generate", {
parse: (d) => (typeof d === "string" ? d : JSON.stringify(d)),
onData: (chunk) => {
setChunks((prev) => [...prev, chunk]);
},
onRollback: (count) => {
// A step failed and will retry — remove the chunks it produced
setChunks((prev) => prev.slice(0, prev.length - count));
},
onResult: (data) => {
setChunks((prev) => [...prev, String(data)]);
},
});
}
return (
<div>
<button onClick={run}>Generate</button>
<pre>{chunks.join("")}</pre>
</div>
);
}
Server API
stream.push(data)
Send a single chunk of data to the client as an SSE frame.
stream.push("Loading...");
stream.push({ progress: 50, message: "Halfway there" });
- Accepts any JSON-serializable value.
- Fire-and-forget — does not return a value or block execution.
- Synchronous from the caller's perspective.
- Silent no-op outside of an Inngest execution context, so your code works the same when called outside of a durable endpoint.
push() is ideal for one-off status messages. It can also be used with provider SDK event callbacks for streaming, though pipe() is usually simpler for that use case:
// Alternative to pipe() — use push() with an event callback
const response = client.messages.stream({ /* ... */ });
response.on("text", (text) => stream.push(text));
const fullText = await response.finalText();
stream.pipe(source)
Pipe a stream of data to the client. Each chunk from the source is sent as an SSE frame in real-time. When the source is fully consumed, pipe() resolves with the concatenated text of all chunks — so it both streams to the client and collects the result for you.
The simplest case is piping a ReadableStream, like a fetch response body:
const response = await fetch("https://api.example.com/stream");
const text = await stream.pipe(response.body);
// `text` contains the full response; the client received it chunk by chunk
When you need to transform or filter chunks before they're sent, pass an async generator function. Each yield sends one chunk to the client:
const text = await stream.pipe(async function* () {
for await (const event of response) {
// Only yield the parts you want the client to see
if (event.type === "content_block_delta") {
yield event.delta.text;
}
}
});
pipe() accepts three source types:
ReadableStream— piped directly, decoded from bytes to string chunks.AsyncIterable<string>— each value in the iterable becomes a chunk.() => AsyncIterable<string>— a function that returns an async iterable. This is what lets you passasync function*generators directly topipe().
Outside of an Inngest execution context, pipe() resolves with an empty string.
Client API
streamRun(url, options)
The primary way to consume a streaming Durable Endpoint. Import it from inngest/durable-endpoints:
import { streamRun } from "inngest/durable-endpoints";
streamRun() returns a thenable — await it to drive the stream to completion. The three core callbacks handle the vast majority of use cases:
onData(chunk)— Called for each chunk. Eachstream.push()call orstream.pipe()yield on the server produces oneonDatacall on the client.onRollback(count)— Called when a step fails and will retry.countis the number of chunks from that step that have been removed. Update your UI to discard them.onResult(data)— Called with the endpoint's return value when execution completes.
Because stream.push() accepts any JSON-serializable value, chunks are typed as unknown by default. Use the parse option to narrow them to a specific type — it receives the deserialized value (not raw text) and its return type becomes the TData generic:
await streamRun<string>("/api/generate", {
parse: (d) => (typeof d === "string" ? d : JSON.stringify(d)),
onData: (chunk) => { /* chunk is now typed as string */ },
onRollback: (count) => { /* ... */ },
onResult: (data) => { /* ... */ },
});
Additional options
| Option | Type | Description |
|---|---|---|
signal | AbortSignal | Cancel the stream. |
fetch | typeof fetch | Custom fetch implementation. |
onStepRunning | (stepId: string, data?: unknown) => void | A step started executing. data is step metadata, if any. |
onStepCompleted | (stepId: string, data?: unknown) => void | A step completed. data is the step's return value. |
onStepErrored | (stepId: string, info: { willRetry: boolean; error: string }) => void | A step errored. |
onMetadata | (runId: string) => void | Run metadata received (always first). |
onDone | () => void | Stream fully consumed. |
onError | (error: unknown) => void | Stream-level error (network failure, non-200, etc.). |
Async iteration
streamRun() also implements Symbol.asyncIterator, so you can use for await to consume parsed data chunks directly. Callbacks still fire during iteration.
const run = streamRun<string>("/api/generate", {
parse: (d) => (typeof d === "string" ? d : JSON.stringify(d)),
onRollback: (count) => { /* ... */ },
});
for await (const chunk of run) {
console.log(chunk);
}
A streamRun() instance can only be consumed once — either by awaiting it or by iterating it. Attempting to consume it a second time will throw an error.
subscribeToRun(options) — Low-level
For advanced use cases, subscribeToRun() is a low-level async generator that yields raw SSE frames. It handles redirect-following transparently but does not manage rollback or chunk accumulation.
import { subscribeToRun } from "inngest/durable-endpoints";
for await (const frame of subscribeToRun({ url: "/api/generate" })) {
switch (frame.type) {
case "stream":
console.log("Data:", frame.data, "Step:", frame.step_id);
break;
case "inngest.step":
console.log(`Step ${frame.step_id}: ${frame.status}`);
break;
case "inngest.result":
console.log("Result:", frame.data);
break;
case "inngest.metadata":
console.log("Run ID:", frame.run_id);
break;
case "inngest.redirect_info":
// Handled automatically — included for visibility
console.log("Redirecting to async stream");
break;
}
}
How it works
Sync-to-async transitions
When a client calls a streaming Durable Endpoint, the SSE stream flows directly from your app to the client. If the endpoint needs to go async (e.g. due to step.sleep(), step.waitForEvent(), or a retry), the SDK sends a redirect frame telling the client where to reconnect, and the stream continues through the Inngest server.
streamRun() handles this redirect automatically — the client sees a single continuous stream regardless of sync-to-async transitions.
Streaming activation
Streaming is activated lazily. The endpoint only sends an SSE response if:
- The client sends the
Accept: text/event-streamheader (whichstreamRun()does automatically), and - Your code calls
stream.push()orstream.pipe()during execution.
If neither push() nor pipe() is called, the endpoint behaves like a regular non-streaming Durable Endpoint.
Rollback on retry
When a step fails and retries, any data streamed during that step is automatically rolled back on the client:
- Each chunk is tagged with the step that produced it.
- When a step completes successfully, its chunks are "committed" — they can never be rolled back.
- When a step errors, uncommitted chunks from that step are removed, and the
onRollbackcallback fires with the count of removed chunks. - On the retry attempt, the step streams fresh data that replaces what was rolled back.
Data streamed outside of a step.run() is never rolled back.
SSE frame types
The stream uses SSE with these frame types. The inngest.* frames are internal protocol frames handled by streamRun() automatically — only stream frames contain user data.
| Event name | Payload | Purpose |
|---|---|---|
inngest.metadata | { run_id } | Always first. Identifies the run. |
stream | { data, step_id? } | User data from push() / pipe(). |
inngest.step | { step_id, status, ... } | Step lifecycle boundaries (running, completed, errored). |
inngest.redirect_info | { run_id, token, url? } | Tells the client to reconnect for async continuation. |
inngest.result | The endpoint's return value | Terminal frame — closes the stream. |
Limitations
Streaming SSE from Durable Endpoints is currently in beta. In addition to any general Durable Endpoint limitations, the following apply:
- 15 minute timeout — Client connections time out after 15 minutes, meaning your endpoint should complete within this window (including any retries) to ensure the stream is delivered end-to-end.
- No rollback outside of steps — Data streamed outside of a
step.run()is never rolled back. If you need rollback guarantees, stream from within a step. - One streaming parallel step — You can stream from at most one parallel step. Streaming from multiple parallel steps will result in interleaved output that cannot be disambiguated by the client.
- No streaming from child functions —
step.invoke()calls cannot stream data back to the parent function's client. - Raw
Responseobjects may be lost on async transition — If your endpoint returns aResponse(like a file download) and goes async, the Response is lost because it can't be memoized. Usestream.push()orstream.pipe()instead.
SDK support
| SDK | Support | Version |
|---|---|---|
| TypeScript | Beta | >= 4.x (with endpointAdapter) |