AgentKit integrates with Inngest’s Realtime API, enabling you to stream updates to your AI Agent’s UI.

This guide will show you how to stream updates to an example Next.js app.

Streaming updates to a Next.js app

Let’s add a simple UI with streamed updates to our Quickstart Database AI Agent composed of two specialized Agents: a Database Administrator and a Security Expert.

Our Database AI Agent now features a realtime chat UI

To enable our Agents to stream updates to the UI, we’ll need to:

  1. Update our Inngest client configuration
  2. Create a channel for our Agents to publish updates to
  3. Update our Agents to publish updates to the UI
  4. Set up the frontend to subscribe to the updates

1. Updating the Inngest client configuration

Create or update your Inngest client as follows:

lib/inngest/client.ts
import { realtimeMiddleware } from "@inngest/realtime";
import { Inngest } from "inngest";

export const inngest = new Inngest({
  id: "realtime-ui-agent-kit-nextjs",
  middleware: [realtimeMiddleware()],
});

This will enable the Realtime API to be used in your Inngest functions.

2. Create a channel for our Agents to publish updates to

In a dedicated file or above your existing Inngest function, create a Realtime channel as follows:

lib/inngest/functions.ts
import { channel, topic } from "@inngest/realtime";

// create a channel for each discussion, given a thread ID. A channel is a namespace for one or more topics of streams.
export const databaseAgentChannel = channel(
  (threadId: string) => `thread:${threadId}`
)
  // Add a specific topic, eg. "ai" for all AI data within the user's channel
  .addTopic(
    topic("messages").schema(
      z.object({
        message: z.string(),
        id: z.string(),
      })
    )
  )
  .addTopic(
    topic("status").schema(
      z.object({
        status: z.enum(["running", "completed", "error"]),
      })
    )
  );

Our databaseAgentChannel takes a unique threadId as an argument, ensuring that each discussion has its own channel.

We also added two topics to the channel:

  • messages: For all messages sent by the Agents
  • status: For global status updates

3. Enabling our Agents to publish updates to the UI

To enable our Agents to stream updates to the UI, we need to move our Agents definition inside an Inngest function. By doing so, our Agents’ tools will get access to the publish() function, which we’ll use to publish updates to the UI:

lib/inngest/functions.ts
export const databaseAgentFunction = inngest.createFunction(
  {
    id: "database-agent",
  },
  {
    event: "database-agent/run",
  },
  async ({ event, publish }) => {
    const { query, threadId } = event.data;

    await publish(databaseAgentChannel(threadId).status({ status: "running" }));

    const dbaAgent = createAgent({
      name: "Database administrator",
      description: "Provides expert support for managing PostgreSQL databases",
      system:
        "You are a PostgreSQL expert database administrator. " +
        "You only provide answers to questions linked to Postgres database schema, indexes, extensions.",
      model: anthropic({
        model: "claude-3-5-haiku-latest",
        defaultParameters: {
          max_tokens: 4096,
        },
      }),
      tools: [
        createTool({
          name: "provide_answer",
          description: "Provide the answer to the questions",
          parameters: z.object({
            answer: z.string(),
          }),
          handler: async (
            { answer },
            { network }: Tool.Options<NetworkState>
          ) => {
            network.state.data.dba_agent_answer = answer;

            await publish(
              databaseAgentChannel(threadId).messages({
                message: `The Database administrator Agent has the following recommendation: ${network.state.data.dba_agent_answer}`,
                id: crypto.randomUUID(),
              })
            );
          },
        }),
      ],
    });

    // securityAgent and network definitions...

    await network.run(query);

    await publish(
      databaseAgentChannel(threadId).status({ status: "completed" })
    );
  }
);

publish() takes a channel topic as an argument, ensuring end-to-end type safety when writing your publish calls.

All messages sent using publish() are guaranteed to be delivered at most once with the lowest latency possible.

Your Inngest Function needs to be served via a Next.js API route: see the example for more details.

4. Build the frontend to subscribe to the updates

Our Database AI Agent is now ready to stream updates to the UI.

Triggering the Agent

First, we’ll need to trigger our Agent with a unique threadId as follows. In a Next.js application, triggering Inngest functions can be achieved using a Server Action:

app/actions.ts
"use server";

import { randomUUID } from "crypto";

export async function runDatabaseAgent(query: string) {
  const threadId = randomUUID();
  await inngest.send({
    name: "database-agent/run",
    data: { threadId, query },
  });

  return threadId;
}

Subscribing to the updates

Now, we’ll need to subscribe to the updates in our Next.js app using Inngest Realtime’s useInngestSubscription hook:

app/page.tsx
"use client";
import { useInngestSubscription } from "@inngest/realtime/hooks";
import { useCallback, useState } from "react";
import { fetchSubscriptionToken, runDatabaseAgent } from "./actions";
import { databaseAgentChannel } from "@/lib/inngest/functions";
import { Realtime } from "@inngest/realtime";

export default function Home() {
  const [query, setQuery] = useState("");
  const [inputValue, setInputValue] = useState("");
  const [threadId, setThreadId] = useState<string | undefined>(undefined);
  const [subscriptionToken, setSubscriptionToken] = useState<
    | Realtime.Token<typeof databaseAgentChannel, ["messages", "status"]>
    | undefined
  >(undefined);

  const { data } = useInngestSubscription({
    token: subscriptionToken,
  });

  const startChat = useCallback(async () => {
    setInputValue("");
    const threadId = await runDatabaseAgent(inputValue);
    setThreadId(threadId);
    setQuery(inputValue);
    setSubscriptionToken(await fetchSubscriptionToken(threadId));
  }, [inputValue]);

  const onKeyDown = useCallback(
    (e: React.KeyboardEvent<HTMLInputElement>) => {
      if (e.key === "Enter") {
        startChat();
      }
    },
    [startChat]
  );

  return (
    // UI ...
  )
}

Looking at the highlighted lines, we can see that the flow is as follows:

  1. The startChat() callback is called when the user clicks the “Run” button or presses Enter.
  2. The startChat() callback calls the runDatabaseAgent() server action to trigger the Agent.
  3. The runDatabaseAgent() server action generates a unique threadId and sends it to the Agent.
  4. The fetchSubscriptionToken() server action fetches a subscription token for the threadId.
  5. The useInngestSubscription() hook subscribes to the messages and status topics and updates the UI in realtime.

Then, the rendering part of the component gets access to a fully typed data object, which contains the latest updates from the Agent:

JSX example using the fully typed data object
{
  data.map((message, idx) =>
    message.topic === "messages" ? (
      <div
        key={`${message.topic}-${message.data.id}`}
        className="flex w-full mb-2 justify-start"
      >
        <div className="max-w-[80%] px-4 py-2 rounded-lg text-sm whitespace-pre-line break-words shadow-md bg-[#232329] text-[#e5e5e5] rounded-bl-none border border-[#232329]">
          {message.data.message}
        </div>
      </div>
    ) : (
      <div
        key={`status-update-${idx}`}
        className="flex w-full mb-2 justify-start"
      >
        <div className="max-w-[80%] px-4 py-2 rounded-lg text-sm whitespace-pre-line break-words shadow-md bg-[#313136] text-[#e5e5e5] rounded-bl-none border border-[#232329]">
          {message.data.status === "completed"
            ? "Here are my recommendations, feel free to ask me anything else!"
            : message.data.status === "error"
              ? "I faced an error, please try again."
              : "Interesting question, I'm thinking..."}
        </div>
      </div>
    )
  );
}

For more details on how to use the useInngestSubscription() hook, please refer to the Inngest Realtime API documentation.