Most voice agent guardrails live inside the agent's system prompt. That works for simple rules, but it tends to break down when safety logic gets complex. A prompt telling your agent to "watch for threats and escalate immediately" competes with the agent's primary job of having a natural conversation. The same model that handles conversation also enforces compliance, and in practice, complex rules can dilute the agent's conversational ability while adding latency. When the conversation model runs under strict latency constraints, adding multi-step safety reasoning often means either the safety logic stays shallow or the response time suffers.
The observer pattern for AI agent orchestration solves this by separating detection from response. A background process monitors the conversation in parallel, evaluates it with a separate LLM, and injects corrections into the active agent's context. The user never knows a second model is involved. The agent finds new instructions in its context and adjusts.
This guide walks you through building a background observer using LiveKit's Agents framework. You'll use the conversation_item_added event for real-time transcript monitoring, a separate LLM for asynchronous policy evaluation, and update_chat_ctx to inject guardrails without interrupting the conversation.
The observer pattern for AI agent orchestration
The observer runs a three-phase loop alongside the main conversation:
- Listen. Register a listener on the AgentSession's
conversation_item_addedevent. Every time the user speaks, the observer captures the transcribed text. - Evaluate. Send the accumulated transcript to a separate LLM with a structured evaluation prompt. This model can be slower and more capable than the conversation model because it runs asynchronously.
- Inject. When the evaluator flags an issue, copy the active agent's chat context, append a system message with the guardrail instruction, and push the updated context back. The agent picks up the new instruction on its next turn.
The observer never blocks the conversation. The user keeps talking, the agent keeps responding, and the observer evaluates in the background. If it finds something, the agent's behavior changes on the next turn.
Setting up the agent session
Here's the session setup for a ride-share support agent with a background observer. The primary agent handles the conversation while the observer monitors for policy violations.
1server = AgentServer()234def prewarm(proc: JobProcess) -> None:5proc.userdata["vad"] = silero.VAD.load()678server.setup_fnc = prewarm91011@server.rtc_session(agent_name="rideshare-agent")12async def rideshare_agent(ctx: JobContext) -> None:13ctx.log_context_fields = {"room": ctx.room.name}1415session = AgentSession[RideData](16userdata=RideData(),17stt=inference.STT(model="deepgram/nova-3", language="multi"),18llm=inference.LLM(model="openai/gpt-4.1-mini"),19tts=inference.TTS(20model="cartesia/sonic-3", voice="9626c31c-bec5-4cca-baa8-f8ba9e84c8bc"21),22turn_detection=MultilingualModel(),23vad=ctx.proc.userdata["vad"],24preemptive_generation=True,25)2627# The observer uses a more capable model for nuanced policy analysis.28# It runs entirely in the background — never blocking the main conversation.29observer_llm = inference.LLM(model="openai/gpt-4.1")30start_observer(session, observer_llm)3132await session.start(33agent=SupportAgent(),34room=ctx.room,35room_options=room_io.RoomOptions(36audio_input=room_io.AudioInputOptions(37noise_cancellation=lambda params: (38noise_cancellation.BVCTelephony()39if params.participant.kind40== rtc.ParticipantKind.PARTICIPANT_KIND_SIP41else noise_cancellation.BVC()42),43),44),45)4647await ctx.connect()
Two things to notice here. The session uses inference.LLM(model="openai/gpt-4.1-mini") for the conversation, a fast model optimized for low latency. The observer gets inference.LLM(model="openai/gpt-4.1"), a more capable model that can handle policy analysis. Both use the LiveKit Inference Gateway, so the only credentials you need are LIVEKIT_URL, LIVEKIT_API_KEY, and LIVEKIT_API_SECRET. No separate provider API keys required.
The start_observer function is synchronous. It instantiates the PolicyObserver in the constructor and returns immediately, so it does not block session startup.
Building the observer
The observer is a plain Python class, not an Agent subclass. It never takes control of the session, never speaks to the user, and never appears in the agent lifecycle. It just listens and injects.
Here's the core structure:
1class PolicyObserver:2"""3Monitors the ride-share support conversation in parallel for policy violations.45Runs an LLM evaluation after every user turn. If an eval is already in6flight when a new turn arrives, a follow-up eval is scheduled to run7immediately after, so no content is ever skipped.89When a violation is detected, injects a [POLICY: ...] system message into10the active agent's chat_ctx via update_chat_ctx. The agent sees the hint11on its next reply cycle without any interruption or handoff. Each violation12type is injected at most once per call to avoid noise.13"""1415VIOLATION_KEYS: ClassVar[list[str]] = [16"safety_emergency", # Caller is in immediate physical danger17"threatening_language", # Threats toward driver, agent, or others18"discrimination_report", # Discriminatory driver behavior19"fraud_attempt", # Fabricating or manipulating a claim20"harassment_report", # Sexual or persistent verbal harassment21]2223def __init__(self, session: AgentSession, observer_llm) -> None:24self.session = session25self.observer_llm = observer_llm26self.conversation_history: list[dict] = []27self.injected_violations: set[str] = set() # Each type injected at most once28self._evaluating = False29self._pending_eval = False # New content arrived while eval was in flight30self._bg_tasks: set[asyncio.Task] = set() # Keep references to prevent GC3132self._setup_listeners()33logger.info("[OBSERVER] Policy observer attached to session")
The constructor takes two arguments: the AgentSession to monitor and a separate LLM instance for evaluation. The injected_violations set tracks which violation types have already been injected, preventing duplicate guardrails. Each violation type is injected at most once per session. The _evaluating flag and _pending_eval flag work together to ensure only one evaluation runs at a time while guaranteeing no user turns are skipped.
Listening to conversation events
The observer hooks into the session's conversation_item_added event to capture every user turn:
1def _setup_listeners(self) -> None:2@self.session.on("conversation_item_added")3def on_item_added(event: ConversationItemAddedEvent) -> None:4# Only monitor user speech, not agent replies5if event.item.role != "user":6return78text = "".join(c for c in event.item.content if isinstance(c, str))9if not text.strip():10return1112self.conversation_history.append({"role": "user", "text": text})13logger.debug(f"[OBSERVER] Buffered: {text[:80]}")1415# Fire an eval on every user turn. If one is already running,16# _pending_eval ensures a follow-up runs when it finishes.17task = asyncio.create_task(self._evaluate())18self._bg_tasks.add(task)19task.add_done_callback(self._bg_tasks.discard)
The conversation_item_added event fires whenever an item is committed to the chat history, for both user and agent messages. The event carries an item property with a role field, so the observer filters for user turns only. Agent responses don't need policy monitoring.
Each user turn triggers an evaluation via asyncio.create_task(). Calling await self._evaluate() directly inside the event handler would block the session's event processing, so the async task is the right approach. The task reference is stored in _bg_tasks to prevent garbage collection.
Running asynchronous evaluations
The evaluation method sends the last 10 user turns to the observer LLM and parses the structured response:
1async def _evaluate(self) -> None:2"""Run LLM-based policy evaluation on recent conversation history.34Only one eval runs at a time. If a new user turn arrives while an eval5is in flight, _pending_eval is set to True. When the current eval6finishes, it schedules one follow-up run, so no content is ever7skipped and concurrent LLM calls never pile up.8"""9if self._evaluating:10self._pending_eval = True # Content arrived mid-eval; re-run when done11return12self._evaluating = True13self._pending_eval = False14try:15recent = self.conversation_history[-10:]16transcript = "\n".join(f"caller: {m['text']}" for m in recent)1718prompt = f"""You are a policy compliance monitor for a ride-sharing support line.19Analyze the caller's statements below and return ONLY a JSON object, no prose.2021Transcript:22{transcript}2324Return this exact JSON structure:25{{26"safety_emergency": false,27"threatening_language": false,28"discrimination_report": false,29"fraud_attempt": false,30"harassment_report": false,31"details": ""32}}33"""34chat_ctx = ChatContext()35chat_ctx.add_message(role="user", content=prompt)3637response_text = ""38async with self.observer_llm.chat(chat_ctx=chat_ctx) as stream:39async for chunk in stream:40if chunk.delta and chunk.delta.content:41response_text += chunk.delta.content4243result = self._parse_response(response_text)44if result:45await self._process_violations(result)4647except Exception:48logger.exception("[OBSERVER] Evaluation error")49finally:50self._evaluating = False51if self._pending_eval:52# New content accumulated while we were running, evaluate it now.53logger.info(54"[OBSERVER] Pending eval, re-running for accumulated turns"55)56task = asyncio.create_task(self._evaluate())57self._bg_tasks.add(task)58task.add_done_callback(self._bg_tasks.discard)
The _evaluating guard at the top prevents concurrent evaluations from stacking. Without this, rapid user turns could spawn multiple overlapping LLM calls, wasting compute and producing conflicting results.
When a new turn arrives while an evaluation is running, _pending_eval is set to True. After the current evaluation finishes, it checks this flag and schedules a follow-up run. This guarantees that no user content is ever skipped, even during long evaluations.
The observer LLM is called via the streaming chat interface, using async with self.observer_llm.chat(chat_ctx=chat_ctx) as stream and yielding chunks with chunk.delta.content. The evaluation prompt asks for structured JSON with boolean fields for each violation category plus a details string, making parsing mostly deterministic.
Parsing the evaluation response
Just to be safe, the parser also includes a fallback that extracts JSON from within surrounding text:
1def _parse_response(self, text: str) -> dict | None:2try:3return json.loads(text.strip())4except json.JSONDecodeError:5pass6start, end = text.find("{"), text.rfind("}") + 17if start >= 0 and end > start:8try:9return json.loads(text[start:end])10except json.JSONDecodeError:11pass12logger.warning(f"[OBSERVER] Could not parse response: {text[:100]}")13return None
The first attempt tries direct parsing. If that fails, it finds the first { and last } and parses the substring between them. This handles the common case where the model wraps its JSON in markdown code fences or adds explanatory text before the response.
Injecting guardrails into the active agent
When the evaluator detects a violation, the observer injects a system message into the active agent's chat context. Here's the injection flow:
1async def _inject_guardrail(self, violation: str, details: str) -> None:2"""Inject a policy hint into the active agent's context.34Copies the current chat context, appends a system message with the5guardrail instruction, then updates the agent's context. The agent6sees this on its next reply cycle without any interruption.7"""8current_agent = self.session.current_agent9if not current_agent:10logger.warning("[OBSERVER] No active agent to inject into")11return1213hint = self.GUARDRAIL_HINTS.get(violation, "")14if details:15hint = f"{hint}\n\nObserver analysis: {details}"1617ctx_copy = current_agent.chat_ctx.copy()18ctx_copy.add_message(role="system", content=hint)19await current_agent.update_chat_ctx(ctx_copy)20logger.info(f"[OBSERVER] Injected guardrail: {violation}")
The injection is a three-step process:
- Get the active agent via
session.current_agent. This property returns whichever agent is currently handling the conversation. - Copy the chat context with
current_agent.chat_ctx.copy(). The agent's chat context is read-only. Trying to modify it directly raises an error: "trying to modify a read-only chat context, please use .copy() and agent.update_chat_ctx() to modify the chat context." You must work with a copy. - Push the update with
await current_agent.update_chat_ctx(ctx_copy). The agent will use this modified context for its next LLM call.
The agent never knows the observer exists. It finds a new [POLICY: ...] system message in its context and acts on it. From the agent's perspective, the instruction was always there.
Deduplication and violation processing
Each violation type is injected at most once per session. Without deduplication, the observer would inject the same guardrail every evaluation cycle, filling the agent's context with repeated instructions.
1async def _process_violations(self, result: dict) -> None:2for key in self.VIOLATION_KEYS:3if result.get(key) and key not in self.injected_violations:4details = result.get("details", "")5logger.warning(f"[OBSERVER] Violation: {key} — {details}")6await self._inject_guardrail(key, details)7self.injected_violations.add(key)
The injected_violations set tracks which types have been sent. Once a violation is injected, it won't be injected again even if later evaluations flag the same category. This keeps the agent's context clean.
The guardrail hints
Each violation type maps to a specific, actionable instruction for the agent. The hints follow a bracketed tag format ([POLICY: TYPE]) with instructions that reference the agent's available tools:
1GUARDRAIL_HINTS: ClassVar[dict[str, str]] = {2"safety_emergency": (3"[POLICY: SAFETY EMERGENCY] The caller may be in immediate physical danger. "4"Use escalate_to_safety_team immediately. Keep them calm and on the line. "5"Do not ask unnecessary questions."6),7"threatening_language": (8"[POLICY: THREATENING LANGUAGE] Threatening language has been detected. "9"Stay calm and do not escalate. Inform the caller that all calls are recorded "10"and threats violate our terms of service. End the call safely if threats continue."11),12"discrimination_report": (13"[POLICY: DISCRIMINATION REPORT] The caller is describing discriminatory behavior "14"by their driver. Acknowledge their experience with empathy. Assure them this will "15"be reviewed under our zero-tolerance policy. Use file_driver_report to formally "16"document the report."17),18"fraud_attempt": (19"[POLICY: FRAUD FLAG] This interaction shows signs of claim manipulation. "20"Continue collecting information normally, do not accuse the caller. "21"The account has been flagged internally for review."22),23"harassment_report": (24"[POLICY: HARASSMENT REPORT] The caller is describing harassment by their driver. "25"Acknowledge their experience with empathy. Use file_driver_report to document "26"the report. Tell them we take this seriously and will follow up within 24 hours."27),28}
Each hint tells the agent what to do, not just what happened. The hints reference specific tools (escalate_to_safety_team, file_driver_report) so the agent knows which actions to take. This is more reliable than vague instructions like "handle this appropriately."
The support agent
The primary agent handles the conversation and responds to guardrail hints when they appear in its context:
1class SupportAgent(Agent):2"""3Conversational ride-share support agent.45Handles fare disputes, driver complaints, and safety concerns through6natural conversation. The PolicyObserver runs alongside this agent and7injects [POLICY: ...] system messages into its context when violations8are detected. This agent acts on those hints without being explicitly9aware of the observer.10"""1112def __init__(self) -> None:13super().__init__(14instructions=(15"You are a ride-share customer support agent. Help callers with fare "16"disputes, driver complaints, and safety concerns. Be calm, empathetic, "17"and professional. If you see a [POLICY: ...] alert in your context, "18"act on it before continuing normally."19)20)2122async def on_enter(self) -> None:23await self.session.generate_reply(24instructions=(25"Greet the caller. Tell them they've reached ride-share support and "26"ask them to briefly describe the issue they're calling about today."27)28)
The SupportAgent subclasses Agent and uses on_enter to greet the caller via session.generate_reply. Its instructions include a single line about acting on [POLICY: ...] alerts. That's all the agent needs to know. The observer handles all the detection logic.
The agent also has tools for acting on guardrails. The escalate_to_safety_team tool handles emergency escalations, and file_driver_report documents complaints:
1@function_tool2async def escalate_to_safety_team(self, context: RunContext_T) -> str:3"""4Use this when the caller is in immediate danger or when the context5contains a [POLICY: SAFETY EMERGENCY] alert.6"""7logger.warning(8f"[ESCALATION] ride={context.userdata.ride_id}, "9f"rider={context.userdata.rider_name}"10)11context.userdata.safety_emergency = True12return (13"This call has been escalated to our 24/7 safety team. "14"A specialist will call the rider back within two minutes. "15"Stay on the line with the caller until they confirm they are safe."16)
The tool descriptions reference the specific policy tags (for example, [POLICY: SAFETY EMERGENCY]) so the model can match guardrail hints to the correct tool.
Putting it all together
Here's the start_observer function that wires everything up:
1def start_observer(session: AgentSession, observer_llm) -> PolicyObserver:2"""Attach a PolicyObserver to the session. Returns immediately, non-blocking."""3return PolicyObserver(session=session, observer_llm=observer_llm)
The observer attaches its event listener in the constructor and starts monitoring as soon as conversation_item_added events fire. No async setup required. The full data flow looks like this:
- The user speaks. LiveKit's STT transcribes the audio.
- The transcription is committed to the chat history, firing a
conversation_item_addedevent. - The observer captures the text and starts an async evaluation.
- Meanwhile, the primary agent responds to the user normally.
- The observer's LLM evaluates the transcript and returns structured JSON.
- If a violation is detected, the observer copies the agent's context, adds a policy hint, and pushes the update.
- On the agent's next turn, it sees the
[POLICY: ...]message and adjusts its behavior.
Steps 3 through 6 happen in the background. The user and the primary agent are unaware of the evaluation.
Production considerations
Evaluation frequency. This implementation evaluates on every user turn. For high-volume conversations, you might batch evaluations every N turns or use a token-count threshold instead. The Doheny Surf Desk example uses a threshold of every 3 user turns. The tradeoff is detection latency vs. LLM cost.
Model selection. The observer model should be more capable than the conversation model for complex policy analysis, but it doesn't need to be the largest model available. Test your specific violation categories and pick the smallest model that reliably detects them.
Structured output reliability. The JSON parsing includes a fallback for imperfect responses, but some models are better at structured output than others. Consider using a model that supports structured output natively, or add validation with defaults for missing fields.
Agent handoffs. If your application uses agent handoffs, the active agent might change between when the observer starts an evaluation and when it tries to inject. The session.current_agent call in _inject_guardrail always returns the current agent at injection time, so the injection goes to whoever is active at that moment, not the agent that was active when evaluation started. Verify that this is the behavior you want for your use case.
Context growth. Each injected guardrail adds a system message to the agent's context. With five violation categories and one-time deduplication, the maximum added context per session is five system messages. If you have many more categories, consider consolidating or removing stale guardrails.
What's next
The observer pattern gives you a clean approach to AI agent orchestration, separating conversation handling from safety monitoring. Your front-line agent stays fast and focused while the observer handles the complex evaluation in the background.
To adapt this pattern for your use case, start with the violation categories that matter most for your application. Swap the ride-share policy hints for your own domain-specific guardrails. The three-phase loop (listen, evaluate, inject) stays the same regardless of what you're monitoring.
The full working example is available in the LiveKit Python agent examples. Give it a try and let us know what you're building.