Skip to content

feat: add Restate agent example #450

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ The Agent2Agent (A2A) protocol facilitates communication between independent AI
* [LlamaIndex](/samples/python/agents/llama_index_file_chat/README.md)
* [Marvin](/samples/python/agents/marvin/README.md)
* [Semantic Kernel](/samples/python/agents/semantickernel/README.md)
* [AG2 + MCP](/samples/python/agents/ag2/README.md)
* [AG2 + MCP](/samples/python/agents/ag2/README.md)
* [Restate](/samples/python/agents/restate/README.md)
* 📑 Review key topics to understand protocol details
* [A2A and MCP](https://google.github.io/A2A/topics/a2a_and_mcp/)
* [Agent Discovery](https://google.github.io/A2A/topics/agent_discovery/)
Expand Down
83 changes: 83 additions & 0 deletions samples/python/agents/restate/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
## Resilient Agents with Restate

This sample uses [Restate](https://ai.restate.dev/) and the Agent Development Kit (ADK) to create a resilient "Expense Reimbursement" agent that is hosted as an A2A server.

Restate lets you build resilient applications easily. It provides a distributed durable version of your everyday building blocks.

In this example, Restate acts as a scalable, resilient task orchestrator that speaks the A2A protocol and gives you:
- 🔁 **Automatic retries** - Handles LLM API downtime, timeouts, and infrastructure failures
- 🔄 **Smart recovery** - Preserves progress across failures without duplicating work
- ⏱️ **Persistent task handles** - Tracks progress across failures, time, and processes
- 🎮 **Task control** - Cancel tasks, query status, re-subscribe to ongoing tasks
- 🧠 **Idempotent submission** - Automatic deduplication based on task ID
- 🤖 **Agentic workflows** - Build resilient agents with human-in-the-loop and parallel tool execution
- 💾 **Durable state** - Maintain consistent agent state across infrastructure events
- 👀 **Full observability** - Line-by-line execution tracking with built-in audit trail
- ☁️️ **Easy to self-host** - or connect to Restate Cloud

<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/a2a/a2a.png" width="600px"/>

This agent takes text requests from the client and, if any details are missing, returns a webform for the client (or its user) to fill out.
After the client fills out the form, the agent will complete the task.

## Prerequisites

- Python 3.12 or higher
- [UV](https://docs.astral.sh/uv/)
- Access to an LLM and API Key


## Running the Sample

1. Navigate to the samples directory:
```bash
cd samples/python/agents/restate
```
2. Create an environment file with your API key:

```bash
echo "GOOGLE_API_KEY=your_api_key_here" > .env
```

4. Run the A2A server and agent:
```bash
uv run .
```

6. Start the Restate Server with Docker ([for other options check the docs](https://docs.restate.dev/develop/local_dev#running-restate-server--cli-locally)).

```shell
docker run -p 8080:8080 -p 9070:9070 \
--add-host=host.docker.internal:host-gateway \
-e 'RESTATE_WORKER__INVOKER__INACTIVITY_TIMEOUT=5min' \
-e 'RESTATE_WORKER__INVOKER__ABORT_TIMEOUT=5min' \
docker.restate.dev/restatedev/restate:latest
```

Let Restate know where the A2A server is running:
```shell
docker run -it --network=host docker.restate.dev/restatedev/restate-cli:latest \
deployments register http://host.docker.internal:9080/restate/v1
```

5. In a separate terminal, run the A2A client:
```
# Connect to the agent (specify the agent URL with correct port)
cd samples/python/hosts/cli
uv run . --agent http://localhost:9080

# If you changed the port when starting the agent, use that port instead
# uv run . --agent http://localhost:YOUR_PORT
```

6. Send requests with the A2A client like: `Reimburse my flight of 700 USD`

Open the Restate UI ([http://localhost:9070](http://localhost:9070)) to see the task execution log and the task state.

<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/a2a/journal.png" width="900px" alt="Example of Restate journal view"/>
<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/a2a/state.png" width="900px" alt="Example of Restate state view"/>

# Learn more
- [Restate Website](https://restate.dev/)
- [Restate Documentation](https://docs.restate.dev/)
- [Restate GitHub repo](https://github.com/restatedev/restate)
75 changes: 75 additions & 0 deletions samples/python/agents/restate/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""A an example of serving a resilient agent using restate.dev"""

import os

import restate

from agent import ReimbursementAgent
from common.types import (
AgentCapabilities,
AgentCard,
AgentSkill,
MissingAPIKeyError,
)
from dotenv import load_dotenv
from fastapi import FastAPI
from middleware import AgentMiddleware


load_dotenv()

RESTATE_HOST = os.getenv('RESTATE_HOST', 'http://localhost:8080')

AGENT_CARD = AgentCard(
name='ReimbursementAgent',
description='This agent handles the reimbursement process for the employees given the amount and purpose of the reimbursement.',
url=RESTATE_HOST,
version='1.0.0',
defaultInputModes=ReimbursementAgent.SUPPORTED_CONTENT_TYPES,
defaultOutputModes=ReimbursementAgent.SUPPORTED_CONTENT_TYPES,
capabilities=AgentCapabilities(streaming=False),
skills=[
AgentSkill(
id='process_reimbursement',
name='Process Reimbursement Tool',
description='Helps with the reimbursement process for users given the amount and purpose of the reimbursement.',
tags=['reimbursement'],
examples=[
'Can you reimburse me $20 for my lunch with the clients?'
],
)
],
)

REIMBURSEMENT_AGENT = AgentMiddleware(AGENT_CARD, ReimbursementAgent())

app = FastAPI()


@app.get('/.well-known/agent.json')
async def agent_json():
"""Serve the agent card"""
return REIMBURSEMENT_AGENT.agent_card_json


app.mount('/restate/v1', restate.app(REIMBURSEMENT_AGENT))


def main():
"""Serve the agent at a specified port using hypercorn."""
import asyncio

import hypercorn
import hypercorn.asyncio

if not os.getenv('GOOGLE_API_KEY'):
raise MissingAPIKeyError('GOOGLE_API_KEY environment variable not set.')

port = os.getenv('AGENT_PORT', '9080')
conf = hypercorn.Config()
conf.bind = [f'0.0.0.0:{port}']
asyncio.run(hypercorn.asyncio.serve(app, conf))


if __name__ == '__main__':
main()
228 changes: 228 additions & 0 deletions samples/python/agents/restate/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
"""An agent that handles reimbursement requests. Pretty much a copy of the
reimbursement agent from this repo, just made the tools a bit more interesting.
"""

import json
import logging
import random

from typing import Any, Optional

from agents.restate.middleware import AgentInvokeResult
from common.types import TextPart
from google.adk.agents.llm_agent import LlmAgent
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools.tool_context import ToolContext
from google.genai import types


logger = logging.getLogger(__name__)


# Local cache of created request_ids for demo purposes.
request_ids = set()


def create_request_form(
date: Optional[str] = None,
amount: Optional[str] = None,
purpose: Optional[str] = None,
) -> dict[str, Any]:
"""Create a request form for the employee to fill out.

Args:
date (str): The date of the request. Can be an empty string.
amount (str): The requested amount. Can be an empty string.
purpose (str): The purpose of the request. Can be an empty string.

Returns:
dict[str, Any]: A dictionary containing the request form data.
"""
logger.info('Creating reimbursement request')
request_id = 'request_id_' + str(random.randint(1000000, 9999999))
request_ids.add(request_id)
reimbursement = {
'request_id': request_id,
'date': '<transaction date>' if not date else date,
'amount': '<transaction dollar amount>' if not amount else amount,
'purpose': (
'<business justification/purpose of the transaction>'
if not purpose
else purpose
),
}
logger.info('Reimbursement request created: %s', json.dumps(reimbursement))

return reimbursement


def return_form(
form_request: dict[str, Any],
tool_context: ToolContext,
instructions: Optional[str] = None,
) -> dict[str, Any]:
"""Returns a structured json object indicating a form to complete.

Args:
form_request (dict[str, Any]): The request form data.
tool_context (ToolContext): The context in which the tool operates.
instructions (str): Instructions for processing the form. Can be an empty string.

Returns:
dict[str, Any]: A JSON dictionary for the form response.
"""
logger.info('Creating return form')
if isinstance(form_request, str):
form_request = json.loads(form_request)

form_dict = {
'type': 'form',
'form': {
'type': 'object',
'properties': {
'date': {
'type': 'string',
'format': 'date',
'description': 'Date of expense',
'title': 'Date',
},
'amount': {
'type': 'string',
'format': 'number',
'description': 'Amount of expense',
'title': 'Amount',
},
'purpose': {
'type': 'string',
'description': 'Purpose of expense',
'title': 'Purpose',
},
'request_id': {
'type': 'string',
'description': 'Request id',
'title': 'Request ID',
},
},
'required': list(form_request.keys()),
},
'form_data': form_request,
'instructions': instructions,
}
logger.info('Return form created: %s', json.dumps(form_dict))
return json.dumps(form_dict)


async def reimburse(request_id: str) -> dict[str, Any]:
"""Reimburse the amount of money to the employee for a given request_id."""
logger.info('Starting reimbursement: %s', request_id)
if request_id not in request_ids:
return {
'request_id': request_id,
'status': 'Error: Invalid request_id.',
}
logger.info('Reimbursement approved: %s', request_id)
return {'request_id': request_id, 'status': 'approved'}


class ReimbursementAgent:
"""An agent that handles reimbursement requests."""

SUPPORTED_CONTENT_TYPES = ['text', 'text/plain']

def __init__(self):
self._agent = self._build_agent()
self._user_id = 'remote_agent'
self._runner = Runner(
app_name=self._agent.name,
agent=self._agent,
artifact_service=InMemoryArtifactService(),
session_service=InMemorySessionService(),
memory_service=InMemoryMemoryService(),
)

async def invoke(self, query, session_id) -> AgentInvokeResult:
logger.info('Invoking LLM')
session = self._runner.session_service.get_session(
app_name=self._agent.name,
user_id=self._user_id,
session_id=session_id,
)
content = types.Content(
role='user', parts=[types.Part.from_text(text=query)]
)
if session is None:
self._runner.session_service.create_session(
app_name=self._agent.name,
user_id=self._user_id,
state={},
session_id=session_id,
)

events = []
async for event in self._runner.run_async(
user_id=self._user_id,
session_id=session_id,
new_message=content,
):
events.append(event)

logger.info('LLM response: %s', events)
if not events or not events[-1].content or not events[-1].content.parts:
return AgentInvokeResult(
parts=[TextPart(text='')],
require_user_input=False,
is_task_complete=True,
)
return AgentInvokeResult(
parts=[
TextPart(
text='\n'.join(
[p.text for p in events[-1].content.parts if p.text]
)
)
],
require_user_input=False,
is_task_complete=True,
)

def _build_agent(self) -> LlmAgent:
"""Builds the LLM agent for the reimbursement agent."""
return LlmAgent(
model='gemini-2.0-flash-001',
name='reimbursement_agent',
description=(
'This agent handles the reimbursement process for the employees'
' given the amount and purpose of the reimbursement.'
),
instruction="""
You are an agent who handle the reimbursement process for employees.

When you receive an reimbursement request, you should first create a new request form using create_request_form(). Only provide default values if they are provided by the user, otherwise use an empty string as the default value.
1. 'Date': the date of the transaction.
2. 'Amount': the dollar amount of the transaction.
3. 'Business Justification/Purpose': the reason for the reimbursement.

Once you created the form, you should return the result of calling return_form with the form data from the create_request_form call.
Clearly let the user know which fields are required and missing.

Once you received the filled-out form back from the user, you should then check the form contains all required information:
1. 'Date': the date of the transaction.
2. 'Amount': the value of the amount of the reimbursement being requested.
3. 'Business Justification/Purpose': the item/object/artifact of the reimbursement.

If you don't have all of the information, you should reject the request directly by calling the request_form method, providing the missing fields.


For valid reimbursement requests, you can then use reimburse() to reimburse the employee.
* In your response, you should include the request_id and the status of the reimbursement request.

""",
tools=[
create_request_form,
reimburse,
return_form,
],
)
Loading