AgentOpera's ChatAgent provides a flexible and powerful architecture for building intelligent agent applications. This document outlines the key components and features of the chat agent system.
Core Concepts
The AgentOpera agent system is built on several shared principles:
name: A unique identifier string for the agent
description: Text explaining the agent's purpose and capabilities
on_messages(): Primary method to process new messages and generate responses
on_messages_stream(): Stream-based variant that yields events during processing
on_reset(): Method to clear agent state and return to initial configuration
run() and run_stream(): Simplified interfaces for common task execution patterns
AssistantAgent
The AssistantAgent serves as the primary implementation class in the agent hierarchy, providing LLM-powered interaction capabilities and tool execution support.
from agentopera.chatflow.agents import AssistantAgentfrom agentopera.chatflow.messages import TextMessagefrom agentopera.engine.types.agent import CancellationTokenfrom agentopera.engine.models.openai import OpenAIChatCompletionClientimport asyncio# Set up the model clientmodel_client =OpenAIChatCompletionClient(model="gpt-4o",# api_key="YOUR_API_KEY", )# Initialize the assistant agentagent =AssistantAgent(name="assistant",model_client=model_client,system_message="You are a helpful assistant. Always provide detailed answers.")# Execute a simple queryasyncdefrun_assistant(): response =await agent.on_messages([TextMessage(content="What is deep learning?",source="user")], CancellationToken())print(response.chat_message.content)# Launch the functionasyncio.run(run_assistant())
Responses
The on_messages() method produces a Response object with two key elements:
chat_message: The final message response from the agent
inner_messages: A sequence of intermediate messages showing the agent's reasoning process
Note: Because agents maintain internal state between calls, always provide only new messages to on_messages(), and avoid passing the entire conversation history.
Message Streaming
For applications requiring real-time interaction, AgentOpera provides streaming capabilities through on_messages_stream():
The stream yields a sequence of events such as:
Agent processing events (tool calls, thought processes, etc.)
The final Response object as the last item
Tools
Agents can extend their capabilities by connecting to external functions and services:
Token Streaming
For fine-grained control over output generation, enable token streaming with model_client_stream=True:
Context Management
The AssistantAgent offers intelligent context management for handling conversation history:
Default: Full History
By default, the agent uses UnboundedChatCompletionContext, which maintains and provides the complete conversation history to the language model.
Bounded History
For long-running conversations or performance optimization, you can limit the amount of conversation history sent to the model (context window):
This approach is particularly useful for applications with extended interactions where older context becomes less relevant.
Other Agent Types
The AgentOpera framework includes several specialized agent implementations:
UserProxyAgent: Interfaces with human users, capturing their input as agent messages
CodeExecutorAgent: Specialized for executing code in various programming languages
OpenAIAssistantAgent: Adapter for OpenAI's Assistant API with additional capabilities
Each agent implementation follows the same core interface and provides domain-specific functionality.
async def examine_response_structure():
response = await agent.on_messages(
[TextMessage(content="What's the capital of France?", source="user")],
CancellationToken()
)
print("Agent response:", response.chat_message.content)
print("Reasoning steps:", [msg for msg in response.inner_messages])
from agentopera.chatflow.ui import Console
async def demonstrate_streaming():
# Method 1: Direct stream processing
async for event in agent.on_messages_stream(
[TextMessage(content="Explain reinforcement learning", source="user")],
CancellationToken()
):
print(event)
# Method 2: Using the built-in Console renderer
await Console(
agent.on_messages_stream(
[TextMessage(content="How does transfer learning work?", source="user")],
CancellationToken()
),
output_stats=True # Enable performance statistics
)
async def weather_lookup(city: str) -> str:
"""Get the current weather for a specific city."""
# In a real implementation, this would call a weather API
return f"The weather in {city} is currently sunny and 72°F."
async def currency_convert(amount: float, from_currency: str, to_currency: str) -> float:
"""Convert between currencies.
Args:
amount: The amount to convert
from_currency: Source currency code (USD, EUR, etc.)
to_currency: Target currency code
"""
# This would typically call an exchange rate API
rates = {"USD_EUR": 0.93, "EUR_USD": 1.07}
rate_key = f"{from_currency}_{to_currency}"
return amount * rates.get(rate_key, 1.0)
# Create an agent with tool capabilities
utility_agent = AssistantAgent(
name="utility_agent",
model_client=model_client,
tools=[weather_lookup, currency_convert],
system_message="Assist users by providing information. Use tools when necessary."
)
token_streaming_agent = AssistantAgent(
name="token_streamer",
model_client=model_client,
system_message="You are a helpful assistant.",
model_client_stream=True # Enable per-token streaming
)
async def display_token_stream():
async for message in token_streaming_agent.on_messages_stream(
[TextMessage(content="List five programming languages", source="user")],
CancellationToken()
):
if message.__class__.__name__ == "ModelClientStreamingChunkEvent":
print(message.content, end="", flush=True)
elif isinstance(message, Response):
print("\n\nComplete response received.")
from agentopera.engine.types.model_context import BufferedChatCompletionContext
# Create an agent with a limited conversation history
memory_efficient_agent = AssistantAgent(
name="memory_efficient",
model_client=model_client,
system_message="You are a helpful assistant.",
model_context=BufferedChatCompletionContext(buffer_size=5) # Only keeps last 5 messages
)