Skip to content

[Feat]: support streamable http mcp #8864

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Jun 3, 2025
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion openhands/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ def on_event(event: Event) -> None:
config.mcp_host, config, None
)
)
# FIXME: OpenHands' SSE server may not be running when CLI mode is started
# FIXME: OpenHands' streamable http server may not be running when CLI mode is started
# if openhands_mcp_server:
# config.mcp.sse_servers.append(openhands_mcp_server)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just remove it :D

Suggested change
# FIXME: OpenHands' streamable http server may not be running when CLI mode is started
# if openhands_mcp_server:
# config.mcp.sse_servers.append(openhands_mcp_server)

config.mcp.stdio_servers.extend(openhands_mcp_stdio_servers)
Expand Down
15 changes: 12 additions & 3 deletions openhands/core/config/mcp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ def __eq__(self, other):
and set(self.env.items()) == set(other.env.items())
)

class MCPStreamableHTTPServerConfig(BaseModel):
url: str
api_key: str | None = None


class MCPConfig(BaseModel):
"""Configuration for MCP (Message Control Protocol) settings.
Expand All @@ -65,6 +69,7 @@ class MCPConfig(BaseModel):

sse_servers: list[MCPSSEServerConfig] = Field(default_factory=list)
stdio_servers: list[MCPStdioServerConfig] = Field(default_factory=list)
streamable_http_servers: list[MCPStreamableHTTPServerConfig] = Field(default_factory=list)

model_config = {'extra': 'forbid'}

Expand Down Expand Up @@ -132,6 +137,9 @@ def from_toml_section(cls, data: dict) -> dict[str, 'MCPConfig']:
servers.append(MCPStdioServerConfig(**server))
data['stdio_servers'] = servers

if 'streamable_http_servers' in data:
pass

# Create SSE config if present
mcp_config = MCPConfig.model_validate(data)
mcp_config.validate_servers()
Expand Down Expand Up @@ -169,7 +177,7 @@ def add_search_engine(app_config: 'OpenHandsConfig') -> MCPStdioServerConfig | N
@staticmethod
def create_default_mcp_server_config(
host: str, config: 'OpenHandsConfig', user_id: str | None = None
) -> tuple[MCPSSEServerConfig, list[MCPStdioServerConfig]]:
) -> tuple[MCPStreamableHTTPServerConfig, list[MCPStdioServerConfig]]:
"""
Create a default MCP server configuration.

Expand All @@ -179,12 +187,13 @@ def create_default_mcp_server_config(
Returns:
tuple[MCPSSEServerConfig, list[MCPStdioServerConfig]]: A tuple containing the default SSE server configuration and a list of MCP stdio server configurations
"""
sse_server = MCPSSEServerConfig(url=f'http://{host}/mcp/sse', api_key=None)
stdio_servers = []
search_engine_stdio_server = OpenHandsMCPConfig.add_search_engine(config)
if search_engine_stdio_server:
stdio_servers.append(search_engine_stdio_server)
return sse_server, stdio_servers

streamable_http_server = MCPStreamableHTTPServerConfig(url=f'http://{host}/mcp/mcp', api_key=None)
return streamable_http_server, stdio_servers


openhands_mcp_config_cls = os.environ.get(
Expand Down
2 changes: 1 addition & 1 deletion openhands/core/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ async def run_controller(
config.mcp_host, config, None
)
)
# FIXME: OpenHands' SSE server may not be running when headless mode is started
# FIXME: OpenHands' streamable http server may not be running when headless mode is started
# if openhands_mcp_server:
# config.mcp.sse_servers.append(openhands_mcp_server)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# FIXME: OpenHands' streamable http server may not be running when headless mode is started
# if openhands_mcp_server:
# config.mcp.sse_servers.append(openhands_mcp_server)

config.mcp.stdio_servers.extend(openhands_mcp_stdio_servers)
Expand Down
82 changes: 81 additions & 1 deletion openhands/mcp/client.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import asyncio
import datetime
from contextlib import AsyncExitStack
from typing import Optional

from mcp import ClientSession
from mcp.client.sse import sse_client
from mcp.client.streamable_http import streamablehttp_client
from pydantic import BaseModel, Field

from openhands.core.logger import openhands_logger as logger
Expand Down Expand Up @@ -58,14 +60,21 @@ async def connect_with_timeout():
if conversation_id:
headers['X-OpenHands-Conversation-ID'] = conversation_id

# Convert float timeout to datetime.timedelta for consistency
timeout_delta = datetime.timedelta(seconds=timeout)

streams_context = sse_client(
url=server_url,
headers=headers if headers else None,
timeout=timeout,
)
streams = await self.exit_stack.enter_async_context(streams_context)
# For SSE client, we only get read_stream and write_stream (2 values)
read_stream, write_stream = streams
self.session = await self.exit_stack.enter_async_context(
ClientSession(*streams)
ClientSession(
read_stream, write_stream, read_timeout_seconds=timeout_delta
)
)
await self._initialize_and_list_tools()

Expand Down Expand Up @@ -117,6 +126,77 @@ async def call_tool(self, tool_name: str, args: dict):
raise RuntimeError('Client session is not available.')
return await self.session.call_tool(name=tool_name, arguments=args)

async def connect_streamable_http(
self,
server_url: str,
api_key: str | None = None,
conversation_id: str | None = None,
timeout: float = 30.0,
) -> None:
"""Connect to an MCP server using StreamableHTTP transport.

Args:
server_url: The URL of the StreamableHTTP server to connect to.
api_key: Optional API key for authentication.
conversation_id: Optional conversation ID for session tracking.
timeout: Connection timeout in seconds. Default is 30 seconds.
"""
if not server_url:
raise ValueError('Server URL is required.')
if self.session:
await self.disconnect()

try:
# Use asyncio.wait_for to enforce the timeout
async def connect_with_timeout():
headers = (
{
'Authorization': f'Bearer {api_key}',
's': api_key, # We need this for action execution server's MCP Router
'X-Session-API-Key': api_key, # We need this for Remote Runtime
}
if api_key
else {}
)

if conversation_id:
headers['X-OpenHands-Conversation-ID'] = conversation_id

# Convert float timeout to datetime.timedelta
timeout_delta = datetime.timedelta(seconds=timeout)
sse_read_timeout_delta = datetime.timedelta(
seconds=timeout * 10
) # 10x longer for read timeout

streams_context = streamablehttp_client(
url=server_url,
headers=headers if headers else None,
timeout=timeout_delta,
sse_read_timeout=sse_read_timeout_delta,
)
streams = await self.exit_stack.enter_async_context(streams_context)
# For StreamableHTTP client, we get read_stream, write_stream, and get_session_id (3 values)
read_stream, write_stream, _ = streams
self.session = await self.exit_stack.enter_async_context(
ClientSession(
read_stream, write_stream, read_timeout_seconds=timeout_delta
)
)
await self._initialize_and_list_tools()

# Apply timeout to the entire connection process
await asyncio.wait_for(connect_with_timeout(), timeout=timeout)
except asyncio.TimeoutError:
logger.error(
f'Connection to {server_url} timed out after {timeout} seconds'
)
await self.disconnect() # Clean up resources
raise # Re-raise the TimeoutError
except Exception as e:
logger.error(f'Error connecting to {server_url}: {str(e)}')
await self.disconnect() # Clean up resources
raise

async def disconnect(self) -> None:
"""Disconnect from the MCP server and clean up resources."""
if self.session:
Expand Down
42 changes: 39 additions & 3 deletions openhands/mcp/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
if TYPE_CHECKING:
from openhands.controller.agent import Agent


from openhands.core.config.mcp_config import (
MCPConfig,
MCPSSEServerConfig,
MCPStreamableHTTPServerConfig,
)
from openhands.core.config.openhands_config import OpenHandsConfig
from openhands.core.logger import openhands_logger as logger
Expand Down Expand Up @@ -48,7 +50,9 @@ def convert_mcp_clients_to_tools(mcp_clients: list[MCPClient] | None) -> list[di


async def create_mcp_clients(
sse_servers: list[MCPSSEServerConfig], conversation_id: str | None = None
sse_servers: list[MCPSSEServerConfig],
streamable_http_servers: list[MCPStreamableHTTPServerConfig],
conversation_id: str | None = None,
) -> list[MCPClient]:
import sys

Expand Down Expand Up @@ -88,13 +92,45 @@ async def create_mcp_clients(
f'Error during disconnect after failed connection: {str(disconnect_error)}'
)

if streamable_http_servers:
for server_url in streamable_http_servers:
logger.info(
f'Initializing MCP agent for {server_url} with Streamable HTTP connection...'
)

client = MCPClient()
try:
await client.connect_streamable_http(
server_url.url,
api_key=server_url.api_key,
conversation_id=conversation_id,
)
# Only add the client to the list after a successful connection
mcp_clients.append(client)
logger.info(f'Connected to MCP server {server_url} via SSE')
except Exception as e:
logger.error(
f'Failed to connect to {server_url}: {str(e)}', exc_info=True
)
try:
await client.disconnect()
except Exception as disconnect_error:
logger.error(
f'Error during disconnect after failed connection: {str(disconnect_error)}'
)
return mcp_clients


async def fetch_mcp_tools_from_config(mcp_config: MCPConfig) -> list[dict]:
async def fetch_mcp_tools_from_config(
mcp_config: MCPConfig, conversation_id: str | None = None
) -> list[dict]:
"""
Retrieves the list of MCP tools from the MCP clients.

Args:
mcp_config: The MCP configuration
conversation_id: Optional conversation ID to associate with the MCP clients

Returns:
A list of tool dictionaries. Returns an empty list if no connections could be established.
"""
Expand All @@ -111,7 +147,7 @@ async def fetch_mcp_tools_from_config(mcp_config: MCPConfig) -> list[dict]:
logger.debug(f'Creating MCP clients with config: {mcp_config}')
# Create clients - this will fetch tools but not maintain active connections
mcp_clients = await create_mcp_clients(
mcp_config.sse_servers,
mcp_config.sse_servers, mcp_config.streamable_http_servers, conversation_id
)

if not mcp_clients:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ async def call_tool_mcp(self, action: MCPAction) -> Observation:
)

# Create clients for this specific operation
mcp_clients = await create_mcp_clients(updated_mcp_config.sse_servers, self.sid)
mcp_clients = await create_mcp_clients(updated_mcp_config.sse_servers, updated_mcp_config.streamable_http_servers, self.sid)

# Call the tool and return the result
# No need for try/finally since disconnect() is now just resetting state
Expand Down
19 changes: 17 additions & 2 deletions openhands/server/app.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import contextlib
import warnings
from contextlib import asynccontextmanager
from typing import AsyncIterator
Expand Down Expand Up @@ -29,6 +30,20 @@
from openhands.server.routes.trajectory import app as trajectory_router
from openhands.server.shared import conversation_manager

mcp_app = mcp_server.http_app(path='/mcp')


def combine_lifespans(*lifespans):
# Create a combined lifespan to manage multiple session managers
@contextlib.asynccontextmanager
async def combined_lifespan(app):
async with contextlib.AsyncExitStack() as stack:
for lifespan in lifespans:
await stack.enter_async_context(lifespan(app))
yield

return combined_lifespan


@asynccontextmanager
async def _lifespan(app: FastAPI) -> AsyncIterator[None]:
Expand All @@ -40,8 +55,8 @@ async def _lifespan(app: FastAPI) -> AsyncIterator[None]:
title='OpenHands',
description='OpenHands: Code Less, Make More',
version=__version__,
lifespan=_lifespan,
routes=[Mount(path='/mcp', app=mcp_server.sse_app())],
lifespan=combine_lifespans(_lifespan, mcp_app.lifespan),
routes=[Mount(path='/mcp', app=mcp_app)],
)


Expand Down
3 changes: 1 addition & 2 deletions openhands/server/routes/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
)
from openhands.storage.data_models.conversation_metadata import ConversationMetadata

mcp_server = FastMCP('mcp', dependencies=get_dependencies())

mcp_server = FastMCP('mcp', stateless_http=True, dependencies=get_dependencies())

async def save_pr_metadata(
user_id: str, conversation_id: str, tool_result: str
Expand Down
2 changes: 1 addition & 1 deletion openhands/server/session/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ async def initialize_agent(
)
)
if openhands_mcp_server:
self.config.mcp.sse_servers.append(openhands_mcp_server)
self.config.mcp.streamable_http_servers.append(openhands_mcp_server)
self.config.mcp.stdio_servers.extend(openhands_mcp_stdio_servers)

# TODO: override other LLM config & agent config groups (#2075)
Expand Down
21 changes: 21 additions & 0 deletions tests/unit/test_mcp_client_timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,24 @@ async def mock_slow_context(*args, **kwargs):
# Test with a very short timeout
with pytest.raises(asyncio.TimeoutError):
await client.connect_sse('http://example.com', timeout=0.1)


@pytest.mark.asyncio
async def test_connect_streamable_http_timeout():
"""Test that connect_streamable_http properly times out when server_url is invalid."""
client = MCPClient()

# Create a mock async context manager that simulates a timeout
@asynccontextmanager
async def mock_slow_context(*args, **kwargs):
# This will hang for longer than our timeout
await asyncio.sleep(10.0)
yield (mock.AsyncMock(), mock.AsyncMock(), mock.AsyncMock())

# Patch the streamablehttp_client function to return our slow context manager
with mock.patch(
'openhands.mcp.client.streamablehttp_client', return_value=mock_slow_context()
):
# Test with a very short timeout
with pytest.raises(asyncio.TimeoutError):
await client.connect_streamable_http('http://example.com', timeout=0.1)
4 changes: 2 additions & 2 deletions tests/unit/test_mcp_create_clients_timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async def connect_sse_with_short_timeout(self, server_url, timeout=30.0):

# Call create_mcp_clients with the invalid URL
start_time = asyncio.get_event_loop().time()
clients = await create_mcp_clients([invalid_url])
clients = await create_mcp_clients([invalid_url], [])
end_time = asyncio.get_event_loop().time()

# Verify that no clients were successfully connected
Expand Down Expand Up @@ -61,7 +61,7 @@ async def connect_sse_with_short_timeout(self, server_url, timeout=30.0):

# Call create_mcp_clients with the unreachable URL
start_time = asyncio.get_event_loop().time()
clients = await create_mcp_clients([unreachable_url])
clients = await create_mcp_clients([unreachable_url], [])
end_time = asyncio.get_event_loop().time()

# Verify that no clients were successfully connected
Expand Down
Loading
Loading