Message Channel

The engine/subscription module in AgentOpera provides a publish-subscribe messaging system that allows agents to communicate with each other efficiently. This document explains the core concepts, components, and usage patterns of this communication mechanism.

Core Concepts

Message Channel

A MessageChannel in AgentOpera defines the scope of a broadcast message. Conceptually, it is equivalent to a Topic in many publish-subscribe systems, composed of two parts:

  • Topic: The type of event the message channel contains. It must match a specific pattern and follow cloud event specifications.

  • Source: The context identifier where an event happened.

This structure can be represented as:

MessageChannel = (Topic, Source)

The topic specifies the type of message being sent, while the source identifies the specific instance or context of that topic type.

Subscription

A Subscription is a protocol that defines how agents express interest in certain message channels. When a message is published to a channel, it is delivered to all subscribed agents, according to matching rules defined by their subscriptions.

Key Components

MessageChannel

@dataclass(eq=True, frozen=True)
class MessageChannel:
    """
    MessageChannel defines the scope of a broadcast message. In essence, agent engine
    implements a publish-subscribe model through its broadcast API: when publishing
    a message, the topic must be specified.
    """
    topic: str
    source: str

DefaultMessageChannel

class DefaultMessageChannel(MessageChannel):
    """
    DefaultMessageChannel provides sensible defaults for topic and source fields.
    
    If created in the context of a message handler, the source will be set to the 
    agent_id of the message handler, otherwise it will be set to "default".
    """

DefaultMessageChannel simplifies the creation of message channels by providing sensible defaults:

  • The topic defaults to "default"

  • The source automatically uses the current agent's ID if available, otherwise "default"

Subscription Protocol

@runtime_checkable
class Subscription(Protocol):
    """Subscriptions define the topics that an agent is interested in."""
    
    @property
    def id(self) -> str: ...
    
    def is_match(self, message_channel: MessageChannel) -> bool: ...
    
    def map_to_agent(self, message_channel: MessageChannel) -> AgentId: ...

The Subscription protocol defines the interface that all subscription implementations must follow:

  • id: A unique identifier for the subscription

  • is_match: Determines if a given message channel matches the subscription

  • map_to_agent: Maps a matching message channel to an agent that should receive the message

Topic-Based Subscriptions

AgentOpera provides two primary implementations for topic-based subscriptions:

TopicSubscription

class TopicSubscription(Subscription):
    """
    This subscription matches on topics based on the topic and maps to agents 
    using the source of the topic as the agent key.
    """

TopicSubscription matches messages with an exact topic match and maps them to agents where:

  • The agent type is predefined in the subscription

  • The agent key is derived from the message channel's source

DefaultSubscription

class DefaultSubscription(TopicSubscription):
    """
    The default subscription is designed to be a sensible default for applications 
    that only need global scope for agents.
    """

DefaultSubscription extends TopicSubscription with sensible defaults:

  • Topic defaults to "default"

  • Agent type is automatically detected from the context if not specified

Publishing Messages

The agent runtime provides a publish_message method to send messages to subscribed agents:

async def publish_message(
    self,
    message: Any,
    message_channel: MessageChannel,
    *,
    sender: AgentId | None = None,
    cancellation_token: CancellationToken | None = None,
    message_id: str | None = None,
) -> None:

This method:

  1. Takes a message of any type and a message channel

  2. Optionally accepts a sender ID, cancellation token, and message ID

  3. Delivers the message to all agents that have subscribed to the matching channel

  4. This is a one-way communication method and does not return responses.

Subscription Patterns

AgentOpera supports various subscription patterns similar to those in the Microsoft AutoGen framework:

1. Single-Tenant, Single Topic

All agents operate within a single tenant and communicate via a single topic:

# Create a default subscription for an agent type
await runtime.subscribe(DefaultSubscription(agent_type="agent_type_name"))

# Publish a message to the default topic
await runtime.publish_message(
    message_content,
    DefaultMessageChannel()
)

2. Multi-Tenant, Single Topic

Each tenant has its own isolated topic through the source identifier:

# Create subscriptions for each agent type
await runtime.subscribe(DefaultSubscription(agent_type="agent_type_name"))

# Publish tenant-specific messages
for tenant_id in ["tenant1", "tenant2"]:
    await runtime.publish_message(
        message_content,
        DefaultMessageChannel(source=tenant_id)
    )

3. Single-Tenant, Multiple Topics

Messages are published to different topics within a single tenant:

# Create topic-specific subscriptions
await runtime.subscribe(TopicSubscription(topic="topic1", agent_type="agent_type1"))
await runtime.subscribe(TopicSubscription(topic="topic2", agent_type="agent_type2"))

# Publish to specific topics
await runtime.publish_message(
    message_content,
    MessageChannel(topic="topic1", source="default")
)

4. Multi-Tenant, Multiple Topics

The most flexible pattern combines multiple topics with multiple tenants:

# Create topic-specific subscriptions for each agent type
await runtime.subscribe(TopicSubscription(topic="topic1", agent_type="agent_type1"))
await runtime.subscribe(TopicSubscription(topic="topic2", agent_type="agent_type2"))

# Publish tenant-specific messages to different topics
for tenant_id in ["tenant1", "tenant2"]:
    await runtime.publish_message(
        message_content,
        MessageChannel(topic="topic1", source=tenant_id)
    )

Usage Example

Here's a complete example showing how to set up subscriptions and publish messages using the agent registration pattern:

from agentopera.engine.runtime.agent_engine import AgentEngine
from agentopera.engine.protocol.subscription.default_subscription import DefaultSubscription
from agentopera.engine.protocol.subscription.topic_subscription import TopicSubscription
from agentopera.engine.protocol.subscription.message_channel import MessageChannel, DefaultMessageChannel
from agentopera.types.agent import AgentId

class AnalyzerAgent:
    # Agent implementation
    def __init__(self, name):
        self.name = name

class ProcessorAgent:
    # Agent implementation
    def __init__(self, name):
        self.name = name

async def setup_messaging(runtime: AgentEngine):
    # Register agents using the register method on the agent class
    await AnalyzerAgent.register(
        runtime,
        "analyzer_agent",
        lambda: AnalyzerAgent("analyzer_agent")
    )
    
    await ProcessorAgent.register(
        runtime,
        "processor_agent",
        lambda: ProcessorAgent("processor_agent")
    )
    
    # Add subscriptions for registered agents
    await runtime.subscribe(DefaultSubscription(topic="analyzer_topic", agent_type="analyzer_agent"))
    await runtime.subscribe(TopicSubscription(topic="processor_topic", agent_type="processor_agent"))
    
    # Publish messages to specific agents
    await runtime.publish_message(
        {"status": "processing_complete", "data": {"results": [1, 2, 3]}},
        MessageChannel(topic="analyzer_topic", source="batch_001")
    )
    
    # You can also use DefaultMessageChannel for simpler cases
    await runtime.publish_message(
        "Simple message for processor",
        DefaultMessageChannel(topic="processor_topic", source="batch_001")
    )

In this example:

  1. Agents register themselves using the register class method, which internally configures them with the runtime

  2. Subscriptions are added for each agent type with specific topics

  3. Messages are published to the message channels, which are routed to the appropriate agents based on the subscriptions

This approach follows the pattern used in production code where agents typically implement their own registration logic.

Best Practices

  1. Use meaningful topic names that reflect the nature of the messages being published.

  2. Structure source IDs to represent the specific instances of your application contexts.

  3. Choose the right subscription type:

    • TopicSubscription for exact matching

    • DefaultSubscription for simple use cases

Last updated