Skip to main content

MongoDB Vector Search for Voice Agents: Build Persistent Memory in Python

A voice agent that forgets you between calls starts every conversation from zero. It can't greet you by name, recall what you asked last time, or pick up where you left off. Personalization, retrieval-augmented generation over a knowledge base, and memory that carries across sessions all need somewhere durable to live. MongoDB Atlas gives you one home for all three. Flexible schemas hold profiles and session reports, an aggregation pipeline does the heavy lifting, and $vectorSearch runs next to the rest of your data.

This guide walks through five integration patterns for wiring Atlas into a LiveKit voice agent. Every snippet is taken from a working starter kit you can clone.

Why persistent state matters in voice

Voice runs on a tighter latency budget than chat. A two-second pause that reads as "thinking" in a chat window reads as "broken" out loud, so anything you ask the agent to remember or look up has to fit inside one user-stops-speaking to agent-starts-speaking round trip. A bloated system prompt loses to a focused one every time.

Three jobs in particular benefit from moving out of the prompt and into a database.

  • Personalization. The agent should know who is on the call before it says hello.
  • Knowledge. The agent should answer questions about your product or domain without memorizing everything.
  • Memory. The agent should remember what was said last time, last week, last quarter.

Atlas fits all of this because the document model maps cleanly to profiles and transcripts, and vector search lives right beside that data.

The five integration points

LiveKit Agents exposes a small set of hooks and lifecycle callbacks that map one-to-one onto database operations. Mix and match.

PatternLiveKit hookMongoDB feature
1. RAG as a function tool@function_tool$vectorSearch aggregation
2. Agentic memory@function_tool$vectorSearch with filter fields
3. Identify + pre-loadAgent dispatch metadata + entrypointfind_one_and_update upsert on users
4. Function-tool CRUD@function_toolAny PyMongo async op
5. Session persistenceon_session_end callbackinsert_one on sessions

The starter puts all five in a single MongoAgent class.

Setting up the project

Both halves are bootstrapped from official LiveKit templates.

1
lk agent init agent --template agent-starter-python
2
lk agent init frontend --template agent-starter-react

The agent template adds pymongo>=4.13 for the async client and voyageai for embeddings. No provider plugins for STT, LLM, or TTS are needed because LiveKit Inference handles those through your LiveKit credentials.

Five environment variables, three LiveKit credentials, the MongoDB connection string, and a Voyage API key.

1
LIVEKIT_URL=
2
LIVEKIT_API_KEY=
3
LIVEKIT_API_SECRET=
4
MONGODB_URI=
5
VOYAGE_API_KEY=

The voice pipeline

Before we get to MongoDB, here is the agent's pipeline. Everything is configured through inference.STT, inference.LLM, and inference.TTS, so you swap providers by changing a string.

1
session = AgentSession(
2
stt=inference.STT(model="deepgram/nova-3", language="multi"),
3
llm=inference.LLM(model="openai/gpt-5.3-chat-latest"),
4
tts=inference.TTS(
5
model="cartesia/sonic-3", voice="9626c31c-bec5-4cca-baa8-f8ba9e84c8bc"
6
),
7
vad=ctx.proc.userdata["vad"],
8
turn_handling=TurnHandlingOptions(
9
turn_detection=MultilingualModel(),
10
preemptive_generation={"enabled": True},
11
),
12
)

None of that is MongoDB-specific, but it sets the timing budget the database work has to live within. The agent class is registered with the AgentServer under a name the frontend dispatches to, with an on_session_end callback that Pattern 5 covers.

Pattern 1: RAG as a function tool

The LLM knows when it needs facts. Small talk, confirmations, and greetings don't need a knowledge-base lookup, and running a vector search on every turn burns embedding calls you didn't need. A function tool lets the model decide.

The tradeoff is latency. A tool call adds a hop on top of the vector search, and in voice that hop is dead air. To hide it, borrow a tiny pattern from LiveKit's user feedback guide. Schedule a verbal status update on a short delay, then cancel it if the search finishes first.

The vector search itself lives in a small helper.

1
async def _vector_search_knowledge(
2
db: AsyncDatabase, query: str, limit: int = 3
3
) -> list[dict]:
4
"""Run the knowledge vector search and return {title, content} docs."""
5
query_embedding = await embed_text(query, input_type="query")
6
pipeline = [
7
{
8
"$vectorSearch": {
9
"index": "knowledge_embedding_index",
10
"path": "embedding",
11
"queryVector": query_embedding,
12
"numCandidates": 100,
13
"limit": limit,
14
}
15
},
16
{"$project": {"title": 1, "content": 1, "_id": 0}},
17
]
18
cursor = await db.knowledge.aggregate(pipeline)
19
return await cursor.to_list(length=limit)

$vectorSearch takes a pre-computed query vector, so we embed the query through Voyage first. The PyMongo async API treats aggregate() as a coroutine that returns an AsyncCursor, so the helper awaits both db.knowledge.aggregate(pipeline) and cursor.to_list(length=limit).

The tool itself is dominated by the status-update pattern, not the database call.

1
@function_tool()
2
async def search_knowledge(
3
self, context: RunContext, query: str
4
) -> str:
5
"""Search the shared knowledge base for facts the user asks about."""
6
7
async def _speak_status_update(delay: float = 0.5) -> None:
8
await asyncio.sleep(delay)
9
await context.session.generate_reply(
10
instructions=(
11
f"You are searching the knowledge base for '{query}' "
12
"but it is taking a moment. Give the user a brief, "
13
"one-sentence update that you are looking it up."
14
)
15
)
16
17
status_task = asyncio.create_task(_speak_status_update(0.5))
18
try:
19
db = await get_db()
20
results = await _vector_search_knowledge(db, query, limit=3)
21
finally:
22
status_task.cancel()
23
return json.dumps({"results": results})

The timer fires only if the search takes longer than 500ms. On a fast path the task is cancelled in the finally block before asyncio.sleep resolves, so the user never hears a filler phrase. On a slow path the model says something like "just a moment, looking that up" and then answers normally once search_knowledge returns.

The vector index is created once on Atlas with SearchIndexModel.

1
SearchIndexModel(
2
definition={
3
"fields": [
4
{
5
"type": "vector",
6
"path": "embedding",
7
"numDimensions": EMBEDDING_DIMENSIONS,
8
"similarity": "cosine",
9
},
10
{"type": "filter", "path": "user_id"},
11
{"type": "filter", "path": "tenant_id"},
12
]
13
},
14
name=name,
15
type="vectorSearch",
16
)

Filter fields matter. $vectorSearch lets you pre-filter candidates with a filter clause, but only on fields declared in the index. Pattern 2 uses those filters to keep one user's memories out of another user's recall.

Pattern 2: Agentic memory as tools

RAG handles knowledge that exists ahead of time. Memory handles knowledge the agent picks up during conversation. The pattern that works best for voice is to expose memory as tools and let the LLM decide what to persist. Five tools cover most cases. remember_detail(memory_type, content) stores or replaces a slot, recall_detail(memory_type) returns it, forget_detail(memory_type) deletes it, search_memories(query) runs hybrid vector and text search, and list_user_memories() returns every slot for this user.

Identity-like fields (name, email, preferred language, timezone) belong on the user's profile document in users, not in free-form memory slots, so Pattern 3 can load them at session start without iterating memories. A sixth tool, update_profile(field, value), writes an allow-listed set of profile fields directly to users.

Memory is modeled as slots. Each (user_id, tenant_id, memory_type) triple holds at most one value, so writing the same label twice replaces the previous entry. That matches how voice agents actually use memory (the user's current favorite color, not a log of every color).

1
async def remember(
2
db: AsyncDatabase,
3
user_id: str,
4
tenant_id: str,
5
memory_type: str,
6
content: str,
7
) -> str:
8
embedding = await embed_text(
9
f"{memory_type}: {content}", input_type="document"
10
)
11
now = _now()
12
await db.memories.update_one(
13
{**_scope(user_id, tenant_id), "memory_type": memory_type},
14
{
15
"$set": {
16
"content": content,
17
"embedding": embedding,
18
"updated_at": now,
19
},
20
"$setOnInsert": {"created_at": now},
21
},
22
upsert=True,
23
)
24
return f"Remembered ({memory_type}): {content}"

The embedding covers "{memory_type}: {content}" rather than content alone, so a short slot like {memory_type: "name", content: "Jesse"} still encodes that "Jesse" is a name. A unique compound index on (user_id, tenant_id, memory_type) enforces the one-value-per-slot rule under concurrent writes.

Hybrid retrieval with $rankFusion

Exact-label recall only works when the LLM knows which label it used. Ask "what's my favorite color?" and the fact might be stored as color_preference, favorite_color, or user_color. The fix is hybrid retrieval. $rankFusion (MongoDB 8.0+) runs $vectorSearch and $search text pipelines in parallel, then merges them with Reciprocal Rank Fusion.

1
pipeline = [
2
{
3
"$rankFusion": {
4
"input": {
5
"pipelines": {
6
"vectorSearch": [
7
{
8
"$vectorSearch": {
9
"index": "memories_embedding_index",
10
"path": "embedding",
11
"queryVector": query_embedding,
12
"numCandidates": 100,
13
"limit": 30,
14
"filter": scope,
15
}
16
}
17
],
18
"textSearch": [
19
{
20
"$search": {
21
"index": "memories_text_index",
22
"compound": {
23
"should": [
24
{"text": {"query": query, "path": "memory_type", "fuzzy": {}}},
25
{"text": {"query": query, "path": "content", "fuzzy": {}}},
26
]
27
},
28
}
29
},
30
{"$match": scope},
31
{"$limit": 30},
32
],
33
}
34
},
35
"combination": {"weights": {"vectorSearch": 0.7, "textSearch": 0.3}},
36
}
37
},
38
{"$limit": limit},
39
{"$project": {"_id": 0, "memory_type": 1, "content": 1}},
40
]

The 0.7 / 0.3 weighting biases toward semantic match while keeping lexical precision for direct hits. Results come back as {memory_type, content} pairs so the LLM can follow up with recall_detail or forget_detail. Both indexes are declared in db/indexes.py. $rankFusion is an 8.0 stage, so the starter needs MongoDB 8.0+ (M10+ runs 8.0 by default).

Pattern 3: Identify the user, then pre-load their context

Before we can load a profile we need a stable id, and it has to reach the agent before it speaks.

LiveKit gives you three places for that data. Job metadata is the right one for session-start identity because ctx.job.metadata is available before ctx.connect(), and the external data docs say to do any network calls in the entrypoint before ctx.connect() so the frontend doesn't render an agent participant that isn't listening yet. Participant attributes don't resolve until after connect, so reach for them when identity changes mid-call.

Server: one httpOnly cookie

The token route owns identity. On first visit it reads lk_mongo_user_cookie, mints a UUID if nothing is there, stamps the id onto the agent dispatch entry, and sends the cookie back.

1
// app/api/token/route.ts
2
import { NextRequest, NextResponse } from 'next/server';
3
import { AccessToken, type AccessTokenOptions, type VideoGrant } from 'livekit-server-sdk';
4
import { RoomAgentDispatch, RoomConfiguration } from '@livekit/protocol';
5
6
const COOKIE_NAME = 'lk_mongo_user_cookie';
7
const COOKIE_MAX_AGE = 60 * 60 * 24 * 365;
8
const AGENT_NAME = process.env.AGENT_NAME;
9
10
export async function POST(req: NextRequest) {
11
let userId = req.cookies.get(COOKIE_NAME)?.value;
12
const isNewCookie = !userId;
13
if (!userId) userId = crypto.randomUUID();
14
15
const metadata = JSON.stringify({ user_id: userId, tenant_id: 'default' });
16
const roomConfig = AGENT_NAME
17
? new RoomConfiguration({
18
agents: [new RoomAgentDispatch({ agentName: AGENT_NAME, metadata })],
19
})
20
: new RoomConfiguration();
21
22
const participantToken = await createParticipantToken(
23
{ identity: `voice_assistant_user_${Math.floor(Math.random() * 10_000)}`, name: 'user' },
24
`voice_assistant_room_${Math.floor(Math.random() * 10_000)}`,
25
roomConfig,
26
);
27
28
const res = NextResponse.json({ serverUrl: LIVEKIT_URL, roomName, participantName: 'user', participantToken });
29
if (isNewCookie) {
30
res.cookies.set({
31
name: COOKIE_NAME,
32
value: userId,
33
httpOnly: true,
34
sameSite: 'lax',
35
secure: process.env.NODE_ENV === 'production',
36
path: '/',
37
maxAge: COOKIE_MAX_AGE,
38
});
39
}
40
return res;
41
}

The cookie is httpOnly, so JavaScript on the page can't read or forge it. Same-origin fetch attaches cookies by default, so TokenSource.endpoint('/api/token') ships the cookie on every token request without extra config. Any room_config the client sends in the body is ignored. The server builds its own RoomConfiguration and stamps the verified id onto agents[0].metadata via RoomAgentDispatch, matching the custom-auth Node.js example.

Read metadata on the agent

Parse ctx.job.metadata before ctx.connect() so preload_user runs in parallel with the connection handshake.

1
@server.rtc_session(agent_name="my-agent", on_session_end=on_session_end)
2
async def my_agent(ctx: JobContext) -> None:
3
meta: dict[str, str] = {}
4
if ctx.job.metadata:
5
try:
6
meta = json.loads(ctx.job.metadata)
7
except json.JSONDecodeError:
8
logger.warning("ctx.job.metadata was not valid JSON; using defaults")
9
10
user_id = meta.get("user_id", DEFAULT_USER_ID)
11
tenant_id = meta.get("tenant_id", DEFAULT_TENANT_ID)
12
ctx.proc.userdata["user_id"] = user_id
13
ctx.proc.userdata["tenant_id"] = tenant_id
14
15
initial_ctx = await preload_user(user_id, tenant_id)
16
# ... build session, start it, connect

DEFAULT_USER_ID is the fallback for console mode (uv run src/agent.py console), where there is no frontend. Stashing the id on ctx.proc.userdata gives on_session_end a place to find it on hangup without threading it through as a parameter.

Pre-load the profile

preload_user does two things. It upserts the users row so every visitor has a stable profile document, and it reads back the document plus all memory slots for this (user_id, tenant_id) scope. Both land in the ChatContext as assistant messages before the LLM speaks.

1
async def preload_user(user_id: str, tenant_id: str) -> ChatContext:
2
"""Pattern 3: upsert the user row, then seed the chat context."""
3
db = await get_db()
4
now = _now()
5
user = await db.users.find_one_and_update(
6
{"user_id": user_id},
7
{
8
"$set": {"last_seen_at": now},
9
"$setOnInsert": {"user_id": user_id, "created_at": now},
10
},
11
upsert=True,
12
return_document=ReturnDocument.AFTER,
13
)
14
15
chat_ctx = ChatContext()
16
name = user.get("name")
17
email = user.get("email")
18
prefs = user.get("preferences", {})
19
if name or email or prefs:
20
chat_ctx.add_message(
21
role="assistant",
22
content=(
23
f"User profile: name={name or 'unknown'}, "
24
f"email={email or 'unknown'}, preferences={prefs}."
25
),
26
)
27
else:
28
chat_ctx.add_message(
29
role="assistant",
30
content=(
31
f"No stored profile fields yet for user_id {user_id}. "
32
"Greet them as a new user."
33
),
34
)
35
36
memories = await list_memories(db, user_id, tenant_id)
37
if memories:
38
lines = "\n".join(
39
f"- {m['memory_type']}: {m['content']}" for m in memories
40
)
41
chat_ctx.add_message(
42
role="assistant",
43
content=f"Remembered facts from prior sessions:\n{lines}",
44
)
45
return chat_ctx

find_one_and_update with upsert=True creates the document if missing, stamps last_seen_at, and returns the post-write state in one round trip. The memory pass closes the loop with Pattern 2. A slot the agent wrote last Tuesday is in context on Wednesday without any extra tool calls.

The agent's on_enter calls self.session.generate_reply with an instruction to greet by name if the profile or remembered facts contain one. All of this has to finish before the agent speaks, with on the order of a few hundred milliseconds of headroom on top of TTS warmup.

This is not authentication

The server owns the id, but it's still anonymous. Some things to keep in mind.

  • Clearing the cookie resets identity. Different browser, private window, or manual delete produces a fresh id and a fresh profile.
  • Production swap is one file. Replace the cookie read in /api/token/route.ts with your session lookup (Better-Auth, Clerk, Supabase) and fall through to the cookie only for guests. The starter ships with a NODE_ENV !== 'development' throw at the top of the route as a tripwire to delete on the same edit.
  • Migrating guests to logins. One updateMany({ user_id: cookieId }, { $set: { user_id: authedId } }) across users, memories, and sessions merges the history onto the real account.
  • The agent doesn't care. ctx.job.metadata reads the same either way.

Pattern 4: Function-tool CRUD

For data the agent reads or writes on demand, @function_tool is the right surface. The example here looks up an order by ID.

1
@function_tool()
2
async def lookup_order(self, context: RunContext, order_id: str) -> str:
3
"""Look up an order by its ID. Returns items, total, and status."""
4
db = await get_db()
5
order = await db.orders.find_one({"order_id": order_id})
6
if not order:
7
raise ToolError(f"Order {order_id} not found.")
8
return json.dumps(
9
{
10
"order_id": order["order_id"],
11
"items": order["items"],
12
"total": order["total"],
13
"status": order["status"],
14
}
15
)

ToolError signals a recoverable failure to the LLM. The message is fed back into the model so it can apologize and ask for a different ID instead of crashing the call. For tools that mutate data, call context.disallow_interruptions() and return a confirmation string the model can read back.

Pattern 5: Session persistence with on_session_end

When a call ends, on_session_end hands you a JobContext to call ctx.make_session_report() and write the result somewhere durable. Here it lands in a sessions collection.

1
async def on_session_end(ctx: JobContext) -> None:
2
"""Pattern 5: persist a session report to MongoDB on hangup."""
3
try:
4
report = ctx.make_session_report()
5
db = await get_db()
6
user_id = ctx.proc.userdata.get("user_id", DEFAULT_USER_ID)
7
tenant_id = ctx.proc.userdata.get("tenant_id", DEFAULT_TENANT_ID)
8
await db.sessions.insert_one(
9
{
10
"session_id": ctx.room.name,
11
"user_id": user_id,
12
"tenant_id": tenant_id,
13
"room_name": ctx.room.name,
14
"report": report.to_dict(),
15
}
16
)
17
logger.info("Persisted session report for %s", ctx.room.name)
18
except Exception:
19
logger.exception("Failed to persist session report")
20
finally:
21
await aclose()

user_id and tenant_id come from ctx.proc.userdata, where Pattern 3 stashed them. Hangup-time code reads the same id that preload set.

report.to_dict() returns a JSON-friendly snapshot you can drop into MongoDB without custom serialization. A few hundred of those gives you a corpus you can aggregate right in place.

Putting the database in front of the agent

The starter's db/client.py is short on purpose.

1
from pymongo import AsyncMongoClient
2
from pymongo.asynchronous.database import AsyncDatabase
3
4
_client: AsyncMongoClient | None = None
5
6
async def get_mongo_client() -> AsyncMongoClient:
7
global _client
8
if _client is None:
9
uri = os.getenv("MONGODB_URI")
10
if not uri:
11
raise RuntimeError("MONGODB_URI environment variable is not set.")
12
_client = AsyncMongoClient(uri)
13
return _client
14
15
async def get_db(db_name: str | None = None) -> AsyncDatabase:
16
client = await get_mongo_client()
17
return client[db_name or os.getenv("MONGODB_DB", DEFAULT_DB_NAME)]

A single AsyncMongoClient per process. PyMongo handles pooling under the hood. For explicit lifecycle ownership, construct the client in prewarm on server.setup_fnc instead. LiveKit forks a process per job, so size your Atlas connection ceiling against replicas × concurrent jobs × maxPoolSize.

Running the starter kit

The agent side needs deps synced, an env file, the one-time model download, and the two MongoDB init scripts.

1
cd agent
2
uv sync
3
cp .env.example .env.local
4
# fill in MONGODB_URI, VOYAGE_API_KEY, LIVEKIT_* in .env.local
5
uv run src/agent.py download-files # one-time: VAD + turn detector models
6
uv run -m db.indexes # collections and vector indexes
7
uv run -m db.seed # sample users, orders, knowledge
8
uv run src/agent.py console

Vector indexes need a minute or two to become queryable on Atlas after creation, so retry if a search returns nothing immediately.

What's in the frontend

The frontend is a sibling Next.js App Router project from agent-starter-react. It mints tokens server-side (Pattern 3) and runs the LiveKit client that joins the room and plays audio.

1
frontend/
2
├── app/
3
│ ├── api/token/route.ts # mints tokens, owns the user cookie
4
│ ├── layout.tsx
5
│ └── page.tsx # renders <App appConfig={APP_CONFIG_DEFAULTS} />
6
├── components/
7
│ ├── app/
8
│ │ ├── app.tsx # TokenSource + useSession + provider
9
│ │ └── view-controller.tsx # welcome <-> active session
10
│ ├── agents-ui/
11
│ │ ├── agent-session-provider.tsx # SessionProvider + RoomAudioRenderer
12
│ │ ├── start-audio-button.tsx # browser autoplay gate
13
│ │ └── blocks/agent-session-view-01/ # tiles, transcript, controls
14
│ └── ui/ # shadcn primitives
15
├── lib/utils.ts # shared client helpers
16
├── hooks/ # useDebug, useAgentErrors
17
└── app-config.ts # feature toggles + agentName dispatch

Connection wiring lives in components/app/app.tsx. TokenSource.endpoint('/api/token') and useSession(tokenSource, { agentName }) from @livekit/components-react own the room lifecycle. The agentName in app-config.ts has to match the agent_name on the Python side for dispatch to reach the agent. AgentSessionProvider wraps SessionProvider with RoomAudioRenderer, which plays the agent's TTS through the page.

Browser autoplay policies block playback until a user gesture, so the kit ships a StartAudioButton calling useStartAudio() on first click.

1
cd ../frontend
2
pnpm install
3
cp .env.example .env.local # fill in LIVEKIT_* (same values as the agent)
4
pnpm dev

Both apps share LiveKit credentials. A root package.json exposes pnpm setup, pnpm db:init, pnpm db:seed, and pnpm dev for a single-command runbook.

Where to go from here

We kept the starter intentionally minimal. A few directions worth exploring.

  • Swap users, orders, and knowledge for collections that match your domain, and rewrite the seed script.
  • Add or remove @function_tool methods on MongoAgent to expose your own database operations to the LLM.
  • Try voyage-3-large for higher-quality embeddings or voyage-multilingual-2 for non-English content. For latency-sensitive flows, voyage-3.5-lite at 512 dimensions is faster than 1024.
  • Look at the LiveKit personal_shopper example for a multi-agent shape with handoffs.

Clone the starter, point it at an Atlas cluster, and you should be talking to a memory-equipped agent in under ten minutes.

Resources