Greeter Receiver Cycle
This example demonstrates a simple distributed agent system using agentopera
.
It includes:
A DistAgentEngine Server that coordinates communication between agents.
Two agents:
GreeterAgent
andReceiveAgent
.A feedback communication cycle between them.
1️⃣ Start the Distributed Agent Server
This server listens on localhost:50051
and handles all agent communication.
import asyncio
import os
from agentopera.engine.runtime.distributed import DistAgentEngineSrv
async def start_server():
service = DistAgentEngineSrv(address="localhost:50051")
service.start()
try:
if os.name == "nt":
await asyncio.Event().wait()
else:
await service.stop_when_signal()
except KeyboardInterrupt:
print("Stopping service...")
finally:
await service.stop()
Run this cell manually in a separate notebook or terminal!
asyncio.run(start_server())
2️⃣ Define Agents and Message Types
Here we define the GreeterAgent
and ReceiveAgent
, and the data messages they'll exchange.
import asyncio
import logging
from dataclasses import dataclass
from agentopera.engine import (
ROOT_LOGGER_NAME,
DefaultSubscription,
DefaultMessageChannel,
MessageContext,
RoutedAgent,
message_handler,
try_get_known_serializers_for_type,
)
from agentopera.engine.runtime.distributed import DistAgentEngine
@dataclass
class AskToGreet:
content: str
@dataclass
class Greeting:
content: str
@dataclass
class ReturnedGreeting:
content: str
@dataclass
class Feedback:
content: str
@dataclass
class ReturnedFeedback:
content: str
class ReceiveAgent(RoutedAgent):
def __init__(self) -> None:
super().__init__("Receive Agent")
@message_handler
async def on_greet(self, message: Greeting, ctx: MessageContext) -> None:
print(f"ReceiveAgent on_greet: {message}, ctx.sender = {ctx.sender}")
await self.publish_message(
ReturnedGreeting(f"Returned greeting: {message.content}"),
DefaultMessageChannel(topic="topic_greeter", source=ctx.message_channel.source),
message_id=ctx.message_id,
)
@message_handler
async def on_feedback(self, message: Feedback, ctx: MessageContext) -> None:
print(f"ReceiveAgent on_feedback: {message}, ctx.sender = {ctx.sender}")
await self.publish_message(
ReturnedFeedback(f"Returned feedback: {message.content}"),
DefaultMessageChannel(topic="topic_greeter", source=ctx.message_channel.source),
message_id=ctx.message_id,
)
class GreeterAgent(RoutedAgent):
def __init__(self) -> None:
super().__init__("Greeter Agent")
@message_handler
async def on_ask(self, message: AskToGreet, ctx: MessageContext) -> None:
print(f"GreeterAgent on_ask: {message}, ctx.sender = {ctx.sender}")
await self.publish_message(
Greeting(f"Hello, {message.content}!"),
DefaultMessageChannel(topic="topic_receiver", source=ctx.message_channel.source),
message_id=ctx.message_id,
)
@message_handler
async def on_returned_greet(self, message: ReturnedGreeting, ctx: MessageContext) -> None:
print(f"GreeterAgent on_returned_greet: {message}, ctx.sender = {ctx.sender}")
await self.publish_message(
Feedback(f"Feedback: {message.content}"),
DefaultMessageChannel(topic="topic_receiver", source=ctx.message_channel.source),
message_id=ctx.message_id,
)
3️⃣ Run the Distributed Engine & Start Agents
In this step, we start the GreeterAgent
and ReceiveAgent
and send the first message.
async def main():
engine = DistAgentEngine(host_address="localhost:50051")
await engine.start()
for t in [AskToGreet, Greeting, ReturnedGreeting, Feedback, ReturnedFeedback]:
engine.register_msg_serializer(try_get_known_serializers_for_type(t))
await ReceiveAgent.register(engine, "receiver", ReceiveAgent)
await engine.subscribe(DefaultSubscription(topic="topic_receiver", agent_type="receiver"))
await GreeterAgent.register(engine, "greeter", GreeterAgent)
await engine.subscribe(DefaultSubscription(topic="topic_greeter", agent_type="greeter"))
await engine.publish_message(
AskToGreet("Hello World!"),
DefaultMessageChannel(topic="topic_greeter", source="test-user"),
message_id="11111111",
)
await engine.stop_when_signal()
await main()
Last updated