Skip to content

Feat: A travel assistant demo implemented based on Google's official a2a-python SDK. #557

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions samples/a2a-python/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# agent example

The "example" under this directory is an agent-demo implemented based on the A2A standard protocol using the official Google SDK [a2a-python](https://github.com/google/a2a-python).


## Prerequisites

- Python 3.13 or higher
- [UV](https://docs.astral.sh/uv/)
- [a2a-python](https://github.com/google/a2a-python)


## agent directory
- [travel_planner](travel_planner/README.md)


## License

This project is licensed under the terms of the [Apache 2.0 License](/LICENSE).

## Contributing

See [CONTRIBUTING.md](/CONTRIBUTING.md) for contribution guidelines.
37 changes: 37 additions & 0 deletions samples/a2a-python/travel_planner/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# travel planner example
> This is a Python implementation that adheres to the A2A (Assume to Answer) protocol.
> It is a travel assistant in line with the specifications of the OpenAI model, capable of providing you with travel planning services.
> A travel assistant demo implemented based on Google's official a2a-python SDK.

## Getting started

1. update [config.json](config.json) with your own OpenAI API key etc.
```json

{
"model_name":"qwen3-32b",
"api_key": "sk-xxxx",
"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1"
}

```

2. Start the server
```bash
uv run .
```

3. Run the loop client
```bash
uv run loop_client.py
```


## License

This project is licensed under the terms of the [Apache 2.0 License](/LICENSE).

## Contributing

See [CONTRIBUTING.md](/CONTRIBUTING.md) for contribution guidelines.

Empty file.
47 changes: 47 additions & 0 deletions samples/a2a-python/travel_planner/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from agent_executor import TravelPlannerAgentExecutor

from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import (
AgentAuthentication,
AgentCapabilities,
AgentCard,
AgentSkill,
)


if __name__ == '__main__':

skill = AgentSkill(
id='travel_planner',
name='travel planner agent',
description='travel planner',
tags=['travel planner'],
examples=['hello', 'nice to meet you!'],
)

agent_card = AgentCard(
name='travel planner Agent',
description='travel planner',
url='http://localhost:10001/',
version='1.0.0',
defaultInputModes=['text'],
defaultOutputModes=['text'],
capabilities=AgentCapabilities(streaming=True),
skills=[skill],
authentication=AgentAuthentication(schemes=['public']),
)


request_handler = DefaultRequestHandler(
agent_executor=TravelPlannerAgentExecutor(),
task_store=InMemoryTaskStore(),
)

server = A2AStarletteApplication(
agent_card=agent_card, http_handler=request_handler
)
import uvicorn

uvicorn.run(server.build(), host='0.0.0.0', port=10001)
73 changes: 73 additions & 0 deletions samples/a2a-python/travel_planner/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
import json,asyncio
from collections.abc import AsyncGenerator

class TravelPlannerAgent:
""" travel planner Agent."""

def __init__(self):
"""Initialize the travel dialogue model"""
try:
with open("config.json") as f:
config = json.load(f)
self.model = ChatOpenAI(
model=config["model_name"],
base_url=config["base_url"],
api_key=config["api_key"],
temperature=0.7 # Control the generation randomness (0-2, higher values indicate greater randomness)
)
except FileNotFoundError:
print("Error: The configuration file config.json cannot be found.")
exit()
except KeyError as e:
print(f"The configuration file is missing required fields: {e}")
exit()

async def stream(self, query: str) -> AsyncGenerator[str, None]:

"""Stream the response of the large model back to the client. """
try:
# Initialize the conversation history (system messages can be added)
messages = [
SystemMessage(
content="""
You are an expert travel assistant specializing in trip planning, destination information,
and travel recommendations. Your goal is to help users plan enjoyable, safe, and
realistic trips based on their preferences and constraints.

When providing information:
- Be specific and practical with your advice
- Consider seasonality, budget constraints, and travel logistics
- Highlight cultural experiences and authentic local activities
- Include practical travel tips relevant to the destination
- Format information clearly with headings and bullet points when appropriate

For itineraries:
- Create realistic day-by-day plans that account for travel time between attractions
- Balance popular tourist sites with off-the-beaten-path experiences
- Include approximate timing and practical logistics
- Suggest meal options highlighting local cuisine
- Consider weather, local events, and opening hours in your planning

Always maintain a helpful, enthusiastic but realistic tone and acknowledge
any limitations in your knowledge when appropriate.
"""
)
]

# Add the user message to the history.
messages.append(HumanMessage(content=query))

# Invoke the model in streaming mode to generate a response.
for chunk in self.model.stream(messages):
# Return the text content block.
if hasattr(chunk, 'content') and chunk.content:
yield {'content': chunk.content, 'done': False}
yield {'content': '', 'done': True}

except Exception as e:
print(f"error:{str(e)}")
yield {'content': 'Sorry, an error occurred while processing your request.', 'done': True}


49 changes: 49 additions & 0 deletions samples/a2a-python/travel_planner/agent_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from uuid import uuid4
from agent import TravelPlannerAgent

from typing_extensions import override

from a2a.types import (
TaskArtifactUpdateEvent,
TaskStatusUpdateEvent,
)
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.utils import new_text_artifact


class TravelPlannerAgentExecutor(AgentExecutor):
"""travel planner AgentExecutor Example."""

def __init__(self):
self.agent = TravelPlannerAgent()

@override
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
query = context.get_user_input()
if not context.message:
raise Exception('No message provided')

print(f'query:{query}')
print('answer:')
async for event in self.agent.stream(query):
print(event['content'])
message = TaskArtifactUpdateEvent(
contextId=context.context_id,
taskId=context.task_id,
artifact=new_text_artifact(
name='current_result',
text=event['content'],
),
)
event_queue.enqueue_event(message)

@override
async def cancel(
self, context: RequestContext, event_queue: EventQueue
) -> None:
raise Exception('cancel not supported')
5 changes: 5 additions & 0 deletions samples/a2a-python/travel_planner/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"model_name":"qwen3-32b",
"api_key": "sk-xxxx",
"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1"
}
61 changes: 61 additions & 0 deletions samples/a2a-python/travel_planner/loop_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from a2a.client import A2AClient
from typing import Any
import httpx
from uuid import uuid4
import asyncio
from a2a.types import (
MessageSendParams,
SendStreamingMessageRequest,
)

def print_welcome_message() -> None:
print("Welcome to the generic A2A client!")
print("Please enter your query (type 'exit' to quit):")

def get_user_query() -> str:
return input("\n> ")

async def interact_with_server(client: A2AClient) -> None:
while True:
user_input = get_user_query()
if user_input.lower() == 'exit':
print("bye!~")
break

send_message_payload: dict[str, Any] = {
'message': {
'role': 'user',
'parts': [
{'type': 'text', 'text': user_input}
],
'messageId': uuid4().hex,
},
}

try:
streaming_request = SendStreamingMessageRequest(
params=MessageSendParams(**send_message_payload)
)
stream_response = client.send_message_streaming(streaming_request)
async for chunk in stream_response:
print(get_response_text(chunk), end='', flush=True)
await asyncio.sleep(0.1)
except Exception as e:
print(f"An error occurred: {e}")

def get_response_text(chunk):
data = chunk.model_dump(mode='json', exclude_none=True)
return data['result']['artifact']['parts'][0]['text']


async def main() -> None:
print_welcome_message()
async with httpx.AsyncClient() as httpx_client:
client = await A2AClient.get_client_from_agent_card_url(
httpx_client, 'http://localhost:10001'
)
await interact_with_server(client)


if __name__ == '__main__':
asyncio.run(main())
25 changes: 25 additions & 0 deletions samples/a2a-python/travel_planner/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[project]
name = "travel_planner"
version = "0.1.0"
description = "travel planner agent example that only returns Messages"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"a2a-sdk>=0.2.0",
"click>=8.1.8",
"dotenv>=0.9.9",
"httpx>=0.28.1",
"pydantic>=2.11.4",
"python-dotenv>=1.1.0",
"langchain-core>=0.2.31",
"langchain-openai>=0.1.26",
"langchain>=0.1.22",
"uvicorn>=0.34.2"
]

[tool.hatch.build.targets.wheel]
packages = ["."]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
Loading