Message Channel
The engine/subscription
module in AgentOpera provides a publish-subscribe messaging system that allows agents to communicate with each other efficiently. This document explains the core concepts, components, and usage patterns of this communication mechanism.
Core Concepts
Message Channel
A MessageChannel
in AgentOpera defines the scope of a broadcast message. Conceptually, it is equivalent to a Topic in many publish-subscribe systems, composed of two parts:
Topic: The type of event the message channel contains. It must match a specific pattern and follow cloud event specifications.
Source: The context identifier where an event happened.
This structure can be represented as:
MessageChannel = (Topic, Source)
The topic specifies the type of message being sent, while the source identifies the specific instance or context of that topic type.
Subscription
A Subscription
is a protocol that defines how agents express interest in certain message channels. When a message is published to a channel, it is delivered to all subscribed agents, according to matching rules defined by their subscriptions.
Key Components
MessageChannel
@dataclass(eq=True, frozen=True)
class MessageChannel:
"""
MessageChannel defines the scope of a broadcast message. In essence, agent engine
implements a publish-subscribe model through its broadcast API: when publishing
a message, the topic must be specified.
"""
topic: str
source: str
DefaultMessageChannel
class DefaultMessageChannel(MessageChannel):
"""
DefaultMessageChannel provides sensible defaults for topic and source fields.
If created in the context of a message handler, the source will be set to the
agent_id of the message handler, otherwise it will be set to "default".
"""
DefaultMessageChannel
simplifies the creation of message channels by providing sensible defaults:
The topic defaults to "default"
The source automatically uses the current agent's ID if available, otherwise "default"
Subscription Protocol
@runtime_checkable
class Subscription(Protocol):
"""Subscriptions define the topics that an agent is interested in."""
@property
def id(self) -> str: ...
def is_match(self, message_channel: MessageChannel) -> bool: ...
def map_to_agent(self, message_channel: MessageChannel) -> AgentId: ...
The Subscription
protocol defines the interface that all subscription implementations must follow:
id
: A unique identifier for the subscriptionis_match
: Determines if a given message channel matches the subscriptionmap_to_agent
: Maps a matching message channel to an agent that should receive the message
Topic-Based Subscriptions
AgentOpera provides two primary implementations for topic-based subscriptions:
TopicSubscription
class TopicSubscription(Subscription):
"""
This subscription matches on topics based on the topic and maps to agents
using the source of the topic as the agent key.
"""
TopicSubscription
matches messages with an exact topic match and maps them to agents where:
The agent type is predefined in the subscription
The agent key is derived from the message channel's source
DefaultSubscription
class DefaultSubscription(TopicSubscription):
"""
The default subscription is designed to be a sensible default for applications
that only need global scope for agents.
"""
DefaultSubscription
extends TopicSubscription
with sensible defaults:
Topic defaults to "default"
Agent type is automatically detected from the context if not specified
Publishing Messages
The agent runtime provides a publish_message
method to send messages to subscribed agents:
async def publish_message(
self,
message: Any,
message_channel: MessageChannel,
*,
sender: AgentId | None = None,
cancellation_token: CancellationToken | None = None,
message_id: str | None = None,
) -> None:
This method:
Takes a message of any type and a message channel
Optionally accepts a sender ID, cancellation token, and message ID
Delivers the message to all agents that have subscribed to the matching channel
This is a one-way communication method and does not return responses.
Subscription Patterns
AgentOpera supports various subscription patterns similar to those in the Microsoft AutoGen framework:
1. Single-Tenant, Single Topic
All agents operate within a single tenant and communicate via a single topic:
# Create a default subscription for an agent type
await runtime.subscribe(DefaultSubscription(agent_type="agent_type_name"))
# Publish a message to the default topic
await runtime.publish_message(
message_content,
DefaultMessageChannel()
)
2. Multi-Tenant, Single Topic
Each tenant has its own isolated topic through the source identifier:
# Create subscriptions for each agent type
await runtime.subscribe(DefaultSubscription(agent_type="agent_type_name"))
# Publish tenant-specific messages
for tenant_id in ["tenant1", "tenant2"]:
await runtime.publish_message(
message_content,
DefaultMessageChannel(source=tenant_id)
)
3. Single-Tenant, Multiple Topics
Messages are published to different topics within a single tenant:
# Create topic-specific subscriptions
await runtime.subscribe(TopicSubscription(topic="topic1", agent_type="agent_type1"))
await runtime.subscribe(TopicSubscription(topic="topic2", agent_type="agent_type2"))
# Publish to specific topics
await runtime.publish_message(
message_content,
MessageChannel(topic="topic1", source="default")
)
4. Multi-Tenant, Multiple Topics
The most flexible pattern combines multiple topics with multiple tenants:
# Create topic-specific subscriptions for each agent type
await runtime.subscribe(TopicSubscription(topic="topic1", agent_type="agent_type1"))
await runtime.subscribe(TopicSubscription(topic="topic2", agent_type="agent_type2"))
# Publish tenant-specific messages to different topics
for tenant_id in ["tenant1", "tenant2"]:
await runtime.publish_message(
message_content,
MessageChannel(topic="topic1", source=tenant_id)
)
Usage Example
Here's a complete example showing how to set up subscriptions and publish messages using the agent registration pattern:
from agentopera.engine.runtime.agent_engine import AgentEngine
from agentopera.engine.protocol.subscription.default_subscription import DefaultSubscription
from agentopera.engine.protocol.subscription.topic_subscription import TopicSubscription
from agentopera.engine.protocol.subscription.message_channel import MessageChannel, DefaultMessageChannel
from agentopera.types.agent import AgentId
class AnalyzerAgent:
# Agent implementation
def __init__(self, name):
self.name = name
class ProcessorAgent:
# Agent implementation
def __init__(self, name):
self.name = name
async def setup_messaging(runtime: AgentEngine):
# Register agents using the register method on the agent class
await AnalyzerAgent.register(
runtime,
"analyzer_agent",
lambda: AnalyzerAgent("analyzer_agent")
)
await ProcessorAgent.register(
runtime,
"processor_agent",
lambda: ProcessorAgent("processor_agent")
)
# Add subscriptions for registered agents
await runtime.subscribe(DefaultSubscription(topic="analyzer_topic", agent_type="analyzer_agent"))
await runtime.subscribe(TopicSubscription(topic="processor_topic", agent_type="processor_agent"))
# Publish messages to specific agents
await runtime.publish_message(
{"status": "processing_complete", "data": {"results": [1, 2, 3]}},
MessageChannel(topic="analyzer_topic", source="batch_001")
)
# You can also use DefaultMessageChannel for simpler cases
await runtime.publish_message(
"Simple message for processor",
DefaultMessageChannel(topic="processor_topic", source="batch_001")
)
In this example:
Agents register themselves using the
register
class method, which internally configures them with the runtimeSubscriptions are added for each agent type with specific topics
Messages are published to the message channels, which are routed to the appropriate agents based on the subscriptions
This approach follows the pattern used in production code where agents typically implement their own registration logic.
Best Practices
Use meaningful topic names that reflect the nature of the messages being published.
Structure source IDs to represent the specific instances of your application contexts.
Choose the right subscription type:
TopicSubscription
for exact matchingDefaultSubscription
for simple use cases
Last updated