Greeter Receiver Cycle

This example demonstrates a simple distributed agent system using agentopera.

It includes:

  • A DistAgentEngine Server that coordinates communication between agents.

  • Two agents: GreeterAgent and ReceiveAgent.

  • A feedback communication cycle between them.

1️⃣ Start the Distributed Agent Server

This server listens on localhost:50051 and handles all agent communication.

import asyncio
import os
from agentopera.engine.runtime.distributed import DistAgentEngineSrv

async def start_server():
    service = DistAgentEngineSrv(address="localhost:50051")
    service.start()
    try:
        if os.name == "nt":
            await asyncio.Event().wait()
        else:
            await service.stop_when_signal()
    except KeyboardInterrupt:
        print("Stopping service...")
    finally:
        await service.stop()

Run this cell manually in a separate notebook or terminal!

asyncio.run(start_server())

2️⃣ Define Agents and Message Types

Here we define the GreeterAgent and ReceiveAgent, and the data messages they'll exchange.

import asyncio
import logging
from dataclasses import dataclass

from agentopera.engine import (
    ROOT_LOGGER_NAME,
    DefaultSubscription,
    DefaultMessageChannel,
    MessageContext,
    RoutedAgent,
    message_handler,
    try_get_known_serializers_for_type,
)
from agentopera.engine.runtime.distributed import DistAgentEngine


@dataclass
class AskToGreet:
    content: str


@dataclass
class Greeting:
    content: str


@dataclass
class ReturnedGreeting:
    content: str


@dataclass
class Feedback:
    content: str


@dataclass
class ReturnedFeedback:
    content: str


class ReceiveAgent(RoutedAgent):
    def __init__(self) -> None:
        super().__init__("Receive Agent")

    @message_handler
    async def on_greet(self, message: Greeting, ctx: MessageContext) -> None:
        print(f"ReceiveAgent on_greet: {message}, ctx.sender = {ctx.sender}")
        await self.publish_message(
            ReturnedGreeting(f"Returned greeting: {message.content}"),
            DefaultMessageChannel(topic="topic_greeter", source=ctx.message_channel.source),
            message_id=ctx.message_id,
        )

    @message_handler
    async def on_feedback(self, message: Feedback, ctx: MessageContext) -> None:
        print(f"ReceiveAgent on_feedback: {message}, ctx.sender = {ctx.sender}")
        await self.publish_message(
            ReturnedFeedback(f"Returned feedback: {message.content}"),
            DefaultMessageChannel(topic="topic_greeter", source=ctx.message_channel.source),
            message_id=ctx.message_id,
        )

class GreeterAgent(RoutedAgent):
    def __init__(self) -> None:
        super().__init__("Greeter Agent")

    @message_handler
    async def on_ask(self, message: AskToGreet, ctx: MessageContext) -> None:
        print(f"GreeterAgent on_ask: {message}, ctx.sender = {ctx.sender}")
        await self.publish_message(
            Greeting(f"Hello, {message.content}!"),
            DefaultMessageChannel(topic="topic_receiver", source=ctx.message_channel.source),
            message_id=ctx.message_id,
        )

    @message_handler
    async def on_returned_greet(self, message: ReturnedGreeting, ctx: MessageContext) -> None:
        print(f"GreeterAgent on_returned_greet: {message}, ctx.sender = {ctx.sender}")
        await self.publish_message(
            Feedback(f"Feedback: {message.content}"),
            DefaultMessageChannel(topic="topic_receiver", source=ctx.message_channel.source),
            message_id=ctx.message_id,
        )

3️⃣ Run the Distributed Engine & Start Agents

In this step, we start the GreeterAgent and ReceiveAgent and send the first message.

async def main():
    engine = DistAgentEngine(host_address="localhost:50051")
    await engine.start()

    for t in [AskToGreet, Greeting, ReturnedGreeting, Feedback, ReturnedFeedback]:
        engine.register_msg_serializer(try_get_known_serializers_for_type(t))

    await ReceiveAgent.register(engine, "receiver", ReceiveAgent)
    await engine.subscribe(DefaultSubscription(topic="topic_receiver", agent_type="receiver"))

    await GreeterAgent.register(engine, "greeter", GreeterAgent)
    await engine.subscribe(DefaultSubscription(topic="topic_greeter", agent_type="greeter"))

    await engine.publish_message(
        AskToGreet("Hello World!"),
        DefaultMessageChannel(topic="topic_greeter", source="test-user"),
        message_id="11111111",
    )
    await engine.stop_when_signal()
await main()

Last updated