Streaming SSE from Durable Endpoints Beta

Durable Endpoints can stream data back to clients in real-time using Server-Sent Events (SSE). This lets you stream AI inference tokens, progress updates, or any other data — while keeping the durability guarantees of durable steps.

Streaming works across multiple steps within a single endpoint invocation, and handles the transition from sync to async mode seamlessly. If a step fails and retries, any data streamed during that step is automatically rolled back on the client.

Streaming SSE from Durable Endpoints is currently only available in the TypeScript SDK.

When to use streaming

  • AI inference — Stream LLM tokens to the browser as they're generated, so users see results immediately.
  • Status updates — Send progress messages during long-running endpoint executions.
  • Making existing streaming endpoints durable — Wrap your existing streaming HTTP endpoints with steps to add retry and observability at no cost to functionality.

Quick start

Server

Import stream from inngest alongside step, then use stream.push() or stream.pipe() inside your endpoint handler:

import Anthropic from "@anthropic-ai/sdk";
import { step, stream } from "inngest";
import { inngest } from "@/inngest";

export const GET = inngest.endpoint(async (req) => {
  // Option A: push() with an SDK event callback
  const text = await step.run("generate", async () => {
    stream.push("Generating...\n");

    const client = new Anthropic();
    const response = client.messages.stream({
      model: "claude-sonnet-4-20250514",
      max_tokens: 512,
      messages: [{ role: "user", content: "Write a haiku about durability." }],
    });

    response.on("text", (token) => stream.push(token));
    return await response.finalText();
  });

  // Option B: pipe() — streams each chunk AND returns the collected text
  await step.run("translate", async () => {
    stream.push(`\nTranslating...\n`);

    const client = new Anthropic();
    const response = client.messages.stream({
      model: "claude-sonnet-4-20250514",
      max_tokens: 256,
      messages: [{ role: "user", content: `Translate to French: ${text}` }],
    });

    return stream.pipe(async function* () {
      for await (const event of response) {
        if (
          event.type === "content_block_delta" &&
          event.delta.type === "text_delta"
        ) {
          yield event.delta.text;
        }
      }
    });
  });

  return Response.json("\nDone!");
});

Client

Use streamRun() from inngest/durable-endpoints to consume the stream. It connects to your endpoint, parses the SSE frames, follows any sync-to-async redirects, and manages rollback — all automatically.

Each stream.push() call or stream.pipe() yield on the server fires the onData callback on the client with that chunk. Using the server example above, the client would receive chunks in this order: "Generating...\n", then each LLM token from the generate step, then "\nTranslating...\n", then each LLM token from the translate step, and finally "\nDone!" as the result.

"use client";

import { useState } from "react";
import { streamRun } from "inngest/durable-endpoints";

export default function Generate() {
  const [chunks, setChunks] = useState<string[]>([]);

  async function run() {
    setChunks([]);

    await streamRun<string>("/api/generate", {
      parse: (d) => (typeof d === "string" ? d : JSON.stringify(d)),
      onData: (chunk) => {
        setChunks((prev) => [...prev, chunk]);
      },
      onRollback: (count) => {
        // A step failed and will retry — remove the chunks it produced
        setChunks((prev) => prev.slice(0, prev.length - count));
      },
      onResult: (data) => {
        setChunks((prev) => [...prev, String(data)]);
      },
    });
  }

  return (
    <div>
      <button onClick={run}>Generate</button>
      <pre>{chunks.join("")}</pre>
    </div>
  );
}

Server API

stream.push(data)

Send a single chunk of data to the client as an SSE frame.

stream.push("Loading...");
stream.push({ progress: 50, message: "Halfway there" });
  • Accepts any JSON-serializable value.
  • Fire-and-forget — does not return a value or block execution.
  • Synchronous from the caller's perspective.
  • Silent no-op outside of an Inngest execution context, so your code works the same when called outside of a durable endpoint.

push() is ideal for one-off status messages. It can also be used with provider SDK event callbacks for streaming, though pipe() is usually simpler for that use case:

// Alternative to pipe() — use push() with an event callback
const response = client.messages.stream({ /* ... */ });
response.on("text", (text) => stream.push(text));
const fullText = await response.finalText();

stream.pipe(source)

Pipe a stream of data to the client. Each chunk from the source is sent as an SSE frame in real-time. When the source is fully consumed, pipe() resolves with the concatenated text of all chunks — so it both streams to the client and collects the result for you.

The simplest case is piping a ReadableStream, like a fetch response body:

const response = await fetch("https://api.example.com/stream");
const text = await stream.pipe(response.body);
// `text` contains the full response; the client received it chunk by chunk

When you need to transform or filter chunks before they're sent, pass an async generator function. Each yield sends one chunk to the client:

const text = await stream.pipe(async function* () {
  for await (const event of response) {
    // Only yield the parts you want the client to see
    if (event.type === "content_block_delta") {
      yield event.delta.text;
    }
  }
});

pipe() accepts three source types:

  • ReadableStream — piped directly, decoded from bytes to string chunks.
  • AsyncIterable<string> — each value in the iterable becomes a chunk.
  • () => AsyncIterable<string> — a function that returns an async iterable. This is what lets you pass async function* generators directly to pipe().

Outside of an Inngest execution context, pipe() resolves with an empty string.

Client API

streamRun(url, options)

The primary way to consume a streaming Durable Endpoint. Import it from inngest/durable-endpoints:

import { streamRun } from "inngest/durable-endpoints";

streamRun() returns a thenable — await it to drive the stream to completion. The three core callbacks handle the vast majority of use cases:

  • onData(chunk) — Called for each chunk. Each stream.push() call or stream.pipe() yield on the server produces one onData call on the client.
  • onRollback(count) — Called when a step fails and will retry. count is the number of chunks from that step that have been removed. Update your UI to discard them.
  • onResult(data) — Called with the endpoint's return value when execution completes.

Because stream.push() accepts any JSON-serializable value, chunks are typed as unknown by default. Use the parse option to narrow them to a specific type — it receives the deserialized value (not raw text) and its return type becomes the TData generic:

await streamRun<string>("/api/generate", {
  parse: (d) => (typeof d === "string" ? d : JSON.stringify(d)),
  onData: (chunk) => { /* chunk is now typed as string */ },
  onRollback: (count) => { /* ... */ },
  onResult: (data) => { /* ... */ },
});

Additional options

OptionTypeDescription
signalAbortSignalCancel the stream.
fetchtypeof fetchCustom fetch implementation.
onStepRunning(stepId: string, data?: unknown) => voidA step started executing. data is step metadata, if any.
onStepCompleted(stepId: string, data?: unknown) => voidA step completed. data is the step's return value.
onStepErrored(stepId: string, info: { willRetry: boolean; error: string }) => voidA step errored.
onMetadata(runId: string) => voidRun metadata received (always first).
onDone() => voidStream fully consumed.
onError(error: unknown) => voidStream-level error (network failure, non-200, etc.).

Async iteration

streamRun() also implements Symbol.asyncIterator, so you can use for await to consume parsed data chunks directly. Callbacks still fire during iteration.

const run = streamRun<string>("/api/generate", {
  parse: (d) => (typeof d === "string" ? d : JSON.stringify(d)),
  onRollback: (count) => { /* ... */ },
});

for await (const chunk of run) {
  console.log(chunk);
}

A streamRun() instance can only be consumed once — either by awaiting it or by iterating it. Attempting to consume it a second time will throw an error.

subscribeToRun(options) — Low-level

For advanced use cases, subscribeToRun() is a low-level async generator that yields raw SSE frames. It handles redirect-following transparently but does not manage rollback or chunk accumulation.

import { subscribeToRun } from "inngest/durable-endpoints";

for await (const frame of subscribeToRun({ url: "/api/generate" })) {
  switch (frame.type) {
    case "stream":
      console.log("Data:", frame.data, "Step:", frame.step_id);
      break;
    case "inngest.step":
      console.log(`Step ${frame.step_id}: ${frame.status}`);
      break;
    case "inngest.result":
      console.log("Result:", frame.data);
      break;
    case "inngest.metadata":
      console.log("Run ID:", frame.run_id);
      break;
    case "inngest.redirect_info":
      // Handled automatically — included for visibility
      console.log("Redirecting to async stream");
      break;
  }
}

How it works

Sync-to-async transitions

When a client calls a streaming Durable Endpoint, the SSE stream flows directly from your app to the client. If the endpoint needs to go async (e.g. due to step.sleep(), step.waitForEvent(), or a retry), the SDK sends a redirect frame telling the client where to reconnect, and the stream continues through the Inngest server.

streamRun() handles this redirect automatically — the client sees a single continuous stream regardless of sync-to-async transitions.

Streaming activation

Streaming is activated lazily. The endpoint only sends an SSE response if:

  • The client sends the Accept: text/event-stream header (which streamRun() does automatically), and
  • Your code calls stream.push() or stream.pipe() during execution.

If neither push() nor pipe() is called, the endpoint behaves like a regular non-streaming Durable Endpoint.

Rollback on retry

When a step fails and retries, any data streamed during that step is automatically rolled back on the client:

  1. Each chunk is tagged with the step that produced it.
  2. When a step completes successfully, its chunks are "committed" — they can never be rolled back.
  3. When a step errors, uncommitted chunks from that step are removed, and the onRollback callback fires with the count of removed chunks.
  4. On the retry attempt, the step streams fresh data that replaces what was rolled back.

Data streamed outside of a step.run() is never rolled back.

SSE frame types

The stream uses SSE with these frame types. The inngest.* frames are internal protocol frames handled by streamRun() automatically — only stream frames contain user data.

Event namePayloadPurpose
inngest.metadata{ run_id }Always first. Identifies the run.
stream{ data, step_id? }User data from push() / pipe().
inngest.step{ step_id, status, ... }Step lifecycle boundaries (running, completed, errored).
inngest.redirect_info{ run_id, token, url? }Tells the client to reconnect for async continuation.
inngest.resultThe endpoint's return valueTerminal frame — closes the stream.

Limitations

Streaming SSE from Durable Endpoints is currently in beta. In addition to any general Durable Endpoint limitations, the following apply:

  • 15 minute timeout — Client connections time out after 15 minutes, meaning your endpoint should complete within this window (including any retries) to ensure the stream is delivered end-to-end.
  • No rollback outside of steps — Data streamed outside of a step.run() is never rolled back. If you need rollback guarantees, stream from within a step.
  • One streaming parallel step — You can stream from at most one parallel step. Streaming from multiple parallel steps will result in interleaved output that cannot be disambiguated by the client.
  • No streaming from child functionsstep.invoke() calls cannot stream data back to the parent function's client.
  • Raw Response objects may be lost on async transition — If your endpoint returns a Response (like a file download) and goes async, the Response is lost because it can't be memoized. Use stream.push() or stream.pipe() instead.

SDK support

SDKSupportVersion
TypeScriptBeta>= 4.x (with endpointAdapter)