Skip to content

support MCP session management #3803

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 30, 2025

Conversation

zane-neo
Copy link
Collaborator

Description

This PR is to support MCP session management to fix the issue in cluster case, the session could lose if client sends message to load balancer or random coordinators.

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • New functionality includes testing.
  • New functionality has been documented.
  • API changes companion pull request created.
  • Commits are signed per the DCO using --signoff.
  • Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.


@Override
public void handleException(TransportException e) {
System.out.println("got exception:" + e.getMessage());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove debugging code ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@Override
public void handleException(TransportException e) {
System.out.println("got exception:" + e.getMessage());
log.error("got exception: ", e);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got exception is too simple, Make error message more readable

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

} catch (IOException ex) {
log.error("Failed to send exception response to client during message handling due to IOException");
log.error("Failed to get the session management index result with sessionId: {}", sessionId);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error message is misleading. Here the exception should be some failure when send exception response to client. I think the original exception make sense.

This error message should be added after link 235.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the error message when getIndex request failed, so we need to change the message here


try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
Map<String, DiscoveryNode> nodes = new HashMap<>(clusterService.state().nodes().getNodes());
nodes.remove(clusterService.localNode().getId());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove local node ? I see this transport action will be used in

if (clusterService.localNode().getId().equals(nodeId)) {
     client
            .execute(
                MLMcpMessageAction.INSTANCE,
                new MLMcpMessageRequest(sessionId, requestBody),
                actionListener
            );

Can you add some comments ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious about the same, I see that we are picking the first node all the time:
clusterService.state().nodes().getNodes().get(nodes.keySet().stream().findFirst().get()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are test code, removing

@Override
protected void doExecute(Task task, ActionRequest request, ActionListener<AcknowledgedResponse> listener) {
MLMcpMessageRequest mlMcpMessageRequest = MLMcpMessageRequest.fromActionRequest(request);
final StreamingRestChannel channel = McpAsyncServerHolder.CHANNELS.get(mlMcpMessageRequest.getSessionId());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use REST channel in transport action? Is this ok ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rest channels here represents persistent connections, it's ok to use them anywhere, but we need somewhere to close them, will add this logic.

@@ -21,6 +26,8 @@ public class McpAsyncServerHolder {
new ObjectMapper()
);

public static Map<String, StreamingRestChannel> CHANNELS = new ConcurrentHashMap<>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the session will be removed ? I don't see remove logic

When/should we close the StreamingRestChannel ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the lifecycle of MCP protocol, we should close the channel when client disconnected, but I'm not sure if we have an approach to listen to this event. This doesn't have big impact for now so we can deprioritize this.

@ylwu-amzn
Copy link
Collaborator

ylwu-amzn commented Apr 29, 2025

Have you addressed all comments from last PR #3781 ?

#3781 (comment), I don't see setting to enable/disable this experimental feature

Suggest add your comment on last PR if the comment already addressed in this PR

}
);
if (clusterService.localNode().getId().equals(nodeId)) {
client
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this ? I see same code in else block

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see the same in else.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you check the first commit, you can see such same code

                                        client
                                            .execute(
                                                MLMcpMessageAction.INSTANCE,
                                                new MLMcpMessageRequest(sessionId, requestBody),
                                                actionListener
                                            );

Copy link
Collaborator

@ylwu-amzn ylwu-amzn Apr 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you removed such code in the second commit for (clusterService.localNode().getId().equals(nodeId)). I think such code not needed for local node case, right ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first I thought you mean the clusterService.localNode().getId().equals(nodeId) in else. I removed the part you mentioned, it's not needed.

String result = String.format("/sse/message?sessionId=%s", sessionId);
return Mono.just(createHttpChunk(ENDPOINT_EVENT_TYPE, result));
ActionListener<IndexResponse> actionListener = ActionListener.wrap(r -> {
if (r != null && r.status() == RestStatus.CREATED) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if the status is not CREATED here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, added else logic.

"node_id": {
"type": "keyword"
},
"status": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the expected values for status? I see active and created being used.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Values are: active and inactive. Now code has only active, created is the rest response status.


@Override
public String executor() {
return SAME;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious, what does this achieve?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which thread will be use to handle the transport response.

channel.sendResponse(new BytesRestResponse(channel, new Exception(e)));
listener.onFailure(new Exception(e));
} catch (IOException ex) {
log.error("Failed to send exception response to client during message handling due to IOException");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log the exception

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's logged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, i meant to log the actual exception log.error("...", ex)

anyways, it's a nit, can handle this later

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

O, I mean I'll add this log, and I've logged in the comment addressing commit.

String nodeId = String.valueOf(r.getSourceAsMap().get("node_id"));
DiscoveryNode node = clusterService.state().getNodes().getNodes().get(nodeId);
if (node == null) {
log.error("The node:{} is no longer in the current cluster, can not handle the mcp request", r.getId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be nodeId and not r.getId()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed

Comment on lines 170 to 184
log.debug("MCP request has been dispatched to corresponding node and handled successfully!");
if (requestBody.contains("notifications/initialized")) {
log
.debug(
"Starting to send OK response for notifications/initialized request in coordinator node"
);
channel.sendChunk(createInitializedNotificationRes());
}
}
},
e -> {
log
.error(
"MCP request has been dispatched to corresponding node but peer node failed to handle it",
e
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's log the nodeId here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

ylwu-amzn
ylwu-amzn previously approved these changes Apr 30, 2025
Copy link
Collaborator

@ylwu-amzn ylwu-amzn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved to unblock RC2. Please address all comments in new PR

@ylwu-amzn
Copy link
Collaborator

CI failed

Suite: Test class org.opensearch.ml.rest.mcpserver.RestMLRegisterMcpToolsActionTests
  2> REPRODUCE WITH: ./gradlew ':opensearch-ml-plugin:test' --tests "org.opensearch.ml.rest.mcpserver.RestMLRegisterMcpToolsActionTests.test_routes" -Dtests.seed=9611987A44BAA22D -Dtests.security.manager=false -Dtests.locale=fr-DZ -Dtests.timezone=SystemV/EST5EDT -Druntime.java=23
  2> java.lang.AssertionError: expected:<[POST /_plugins/_ml/mcp_tools/_register]> but was:<[POST /_plugins/_ml/mcp/tools/_register]>
        at __randomizedtesting.SeedInfo.seed([9611987A44BAA22D:DA045D84C754C8A3]:0)
        at org.junit.Assert.fail(Assert.java:89)
        at org.junit.Assert.failNotEquals(Assert.java:835)
        at org.junit.Assert.assertEquals(Assert.java:120)
        at org.junit.Assert.assertEquals(Assert.java:146)
        at org.opensearch.ml.rest.mcpserver.RestMLRegisterMcpToolsActionTests.test_routes(RestMLRegisterMcpToolsActionTests.java:172)

ylwu-amzn
ylwu-amzn previously approved these changes Apr 30, 2025
@zane-neo
Copy link
Collaborator Author

Let's try to take care of these comment since we cannot modify them later (will keep adding more as I see them):

Additionally, for the comments that we don't address, can you create issues to track them?

These comments are been address in this PR.

@@ -92,26 +93,26 @@ public static McpTool parse(XContentParser parser) throws IOException {
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should these fields also be changed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which fields? Not following.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 85-89

Copy link
Collaborator

@ylwu-amzn ylwu-amzn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

approve to unblock 3.0 RC2

@zane-neo zane-neo merged commit 7c05295 into opensearch-project:main Apr 30, 2025
6 of 10 checks passed
opensearch-trigger-bot bot pushed a commit that referenced this pull request Apr 30, 2025
* support MCP session management

Signed-off-by: zane-neo <[email protected]>

* Addressing comments

Signed-off-by: zane-neo <[email protected]>

* add feature flag for mcp server and renaming mcp connector feature flag

Signed-off-by: zane-neo <[email protected]>

* Address critical comments in #3781

Signed-off-by: zane-neo <[email protected]>

---------

Signed-off-by: zane-neo <[email protected]>
(cherry picked from commit 7c05295)
zane-neo added a commit that referenced this pull request Apr 30, 2025
* support MCP session management



* Addressing comments



* add feature flag for mcp server and renaming mcp connector feature flag



* Address critical comments in #3781



---------


(cherry picked from commit 7c05295)

Signed-off-by: zane-neo <[email protected]>
Co-authored-by: zane-neo <[email protected]>
dhrubo-os added a commit that referenced this pull request May 8, 2025
* [BUG] Agent Framework: Handle model response when toolUse is not accompanied by text (#3755)

* fix: handle model response when toolUse is not accompanied by text

Signed-off-by: Pavan Yekbote <[email protected]>

* feat: add test case for parseLLMOutput

Signed-off-by: Pavan Yekbote <[email protected]>

---------

Signed-off-by: Pavan Yekbote <[email protected]>

* [BUG] Allow user to control react agent max_interations value to prevent empty response (#3756)

* fix: expose max_iteration for react

Signed-off-by: Pavan Yekbote <[email protected]>

* fix: defaults for agent execution and differentiate between step and step result

Signed-off-by: Pavan Yekbote <[email protected]>

* fix: return react agent id in agent response to expose more details

Signed-off-by: Pavan Yekbote <[email protected]>

* spotless

Signed-off-by: Pavan Yekbote <[email protected]>

* fix: remove test prompt from react system prompt

Signed-off-by: Pavan Yekbote <[email protected]>

* refactor: rename parameters exposed to user to executor

Signed-off-by: Pavan Yekbote <[email protected]>

* fix: give user complete control over planner system prompt

Signed-off-by: Pavan Yekbote <[email protected]>

---------

Signed-off-by: Pavan Yekbote <[email protected]>

* Clean up JSM from MCP (#3773)

Signed-off-by: rithin-pullela-aws <[email protected]>

* [Bug] ListTools call does not return tool attributes (#3785)

* initial commit for MCP server in OpenSearch (#3781)

* initial commit for MCP server in OpenSearch

Signed-off-by: zane-neo <[email protected]>

* Make change to support register or remove tools across cluster

Signed-off-by: zane-neo <[email protected]>

* format code

Signed-off-by: zane-neo <[email protected]>

* fix UT failure caused by code change

Signed-off-by: zane-neo <[email protected]>

* format code

Signed-off-by: zane-neo <[email protected]>

* format code

Signed-off-by: zane-neo <[email protected]>

* add license header

Signed-off-by: zane-neo <[email protected]>

* fix notifications initialized not respond issue

Signed-off-by: zane-neo <[email protected]>

* fix minor issues and add UTs

Signed-off-by: zane-neo <[email protected]>

* Add more UTs

Signed-off-by: zane-neo <[email protected]>

---------

Signed-off-by: zane-neo <[email protected]>

* Remove beta1 qualifier (#3794) (#3795)

(cherry picked from commit 3f503f1)

Signed-off-by: Peter Zhu <[email protected]>
Co-authored-by: Peter Zhu <[email protected]>

* [AUTO] Increment version to 3.1.0-SNAPSHOT (#3789)

* Increment version to 3.1.0-SNAPSHOT

Signed-off-by: opensearch-ci-bot <[email protected]>

* Update build.gradle

Signed-off-by: Peter Zhu <[email protected]>

---------

Signed-off-by: opensearch-ci-bot <[email protected]>
Signed-off-by: Peter Zhu <[email protected]>
Co-authored-by: opensearch-ci-bot <[email protected]>
Co-authored-by: Peter Zhu <[email protected]>

* add release note for 3.0 (#3792)

Signed-off-by: Mingshi Liu <[email protected]>

* support MCP session management (#3803)

* support MCP session management

Signed-off-by: zane-neo <[email protected]>

* Addressing comments

Signed-off-by: zane-neo <[email protected]>

* add feature flag for mcp server and renaming mcp connector feature flag

Signed-off-by: zane-neo <[email protected]>

* Address critical comments in #3781

Signed-off-by: zane-neo <[email protected]>

---------

Signed-off-by: zane-neo <[email protected]>

* upgrade http client to version align with core (#3809)

* upgrade http client to versoin align with core

Signed-off-by: zane-neo <[email protected]>

* upgrade httpclient-h2 to correct versiono

Signed-off-by: zane-neo <[email protected]>

* use placeholder approach

Signed-off-by: zane-neo <[email protected]>

---------

Signed-off-by: zane-neo <[email protected]>

* support customized message endpoint and addressing comments (#3810)

* support customized message endpoint and addressing comments

Signed-off-by: zane-neo <[email protected]>

* fix UT failures

Signed-off-by: zane-neo <[email protected]>

* add files to jacoco exception

Signed-off-by: zane-neo <[email protected]>

* fix tool name issue and optimize register tool api

Signed-off-by: zane-neo <[email protected]>

* fix schema not parsed correctly issue and NPE when parameters is null

Signed-off-by: zane-neo <[email protected]>

* fix failure UT

Signed-off-by: zane-neo <[email protected]>

---------

Signed-off-by: zane-neo <[email protected]>

* excluding circuit breaker for Agent (#3814)

Signed-off-by: Dhrubo Saha <[email protected]>

* change release note (#3811)

* change release note

Signed-off-by: zane-neo <[email protected]>

* Update opensearch-ml-common.release-notes-3.0.0.0.md

* Update opensearch-ml-common.release-notes-3.0.0.0.md

* Update opensearch-ml-common.release-notes-3.0.0.0.md

---------

Signed-off-by: zane-neo <[email protected]>
Co-authored-by: Peter Zhu <[email protected]>

* Downgrade MCP version to 0.9 (#3821)

Signed-off-by: rithin-pullela-aws <[email protected]>

* remove libs folder (#3824)

Signed-off-by: Yaliang Wu <[email protected]>

* add more logging to deploy/undeploy flows for better debugging (#3825)

* add more logging to deploy/undeploy flows for better debugging

Signed-off-by: Bhavana Goud Ramaram <[email protected]>

* Fix python client not able to connect to MCP server issue (#3822)

Signed-off-by: zane-neo <[email protected]>
Co-authored-by: Dhrubo Saha <[email protected]>

* exclude trusted connector check for hidden model (#3838)

Signed-off-by: Dhrubo Saha <[email protected]>

* adding tenantId to the connector executor when this is inline connector (#3837)

* adding tenantId to the connector executor when this is inline connector

Signed-off-by: Dhrubo Saha <[email protected]>

* added more unit tests

Signed-off-by: Dhrubo Saha <[email protected]>

---------

Signed-off-by: Dhrubo Saha <[email protected]>

---------

Signed-off-by: Pavan Yekbote <[email protected]>
Signed-off-by: rithin-pullela-aws <[email protected]>
Signed-off-by: zane-neo <[email protected]>
Signed-off-by: Peter Zhu <[email protected]>
Signed-off-by: opensearch-ci-bot <[email protected]>
Signed-off-by: Mingshi Liu <[email protected]>
Signed-off-by: Dhrubo Saha <[email protected]>
Signed-off-by: Yaliang Wu <[email protected]>
Signed-off-by: Bhavana Goud Ramaram <[email protected]>
Co-authored-by: Pavan Yekbote <[email protected]>
Co-authored-by: Rithin Pullela <[email protected]>
Co-authored-by: zane-neo <[email protected]>
Co-authored-by: opensearch-trigger-bot[bot] <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com>
Co-authored-by: Peter Zhu <[email protected]>
Co-authored-by: opensearch-ci-bot <[email protected]>
Co-authored-by: Mingshi Liu <[email protected]>
Co-authored-by: Yaliang Wu <[email protected]>
Co-authored-by: Bhavana Goud Ramaram <[email protected]>
rcho93 pushed a commit to rcho93/ml-commons that referenced this pull request May 9, 2025
* support MCP session management

Signed-off-by: zane-neo <[email protected]>

* Addressing comments

Signed-off-by: zane-neo <[email protected]>

* add feature flag for mcp server and renaming mcp connector feature flag

Signed-off-by: zane-neo <[email protected]>

* Address critical comments in opensearch-project#3781

Signed-off-by: zane-neo <[email protected]>

---------

Signed-off-by: zane-neo <[email protected]>
dhrubo-os pushed a commit that referenced this pull request May 12, 2025
* initial commit for MCP server in OpenSearch (#3781)

* initial commit for MCP server in OpenSearch

Signed-off-by: zane-neo <[email protected]>

* Make change to support register or remove tools across cluster

Signed-off-by: zane-neo <[email protected]>

* format code

Signed-off-by: zane-neo <[email protected]>

* fix UT failure caused by code change

Signed-off-by: zane-neo <[email protected]>

* format code

Signed-off-by: zane-neo <[email protected]>

* format code

Signed-off-by: zane-neo <[email protected]>

* add license header

Signed-off-by: zane-neo <[email protected]>

* fix notifications initialized not respond issue

Signed-off-by: zane-neo <[email protected]>

* fix minor issues and add UTs

Signed-off-by: zane-neo <[email protected]>

* Add more UTs

Signed-off-by: zane-neo <[email protected]>

---------

Signed-off-by: zane-neo <[email protected]>

* support MCP session management (#3803)

* support MCP session management

Signed-off-by: zane-neo <[email protected]>

* Addressing comments

Signed-off-by: zane-neo <[email protected]>

* add feature flag for mcp server and renaming mcp connector feature flag

Signed-off-by: zane-neo <[email protected]>

* Address critical comments in #3781

Signed-off-by: zane-neo <[email protected]>

---------

Signed-off-by: zane-neo <[email protected]>

* Create prompt API & System Index for prompt

Signed-off-by: seungwon cho <[email protected]>

* spotless is applied & disabled wildcard import

Signed-off-by: seungwon cho <[email protected]>

* javadoc added & header added & minor code errors fixed

Signed-off-by: seungwon cho <[email protected]>

* Apply spotless

Signed-off-by: seungwon cho <[email protected]>

* apply spotless

Signed-off-by: seungwon cho <[email protected]>

* addressed comments

Signed-off-by: seungwon cho <[email protected]>

* apply spotless

Signed-off-by: seungwon cho <[email protected]>

* addresses comments

Signed-off-by: seungwon cho <[email protected]>

* apply spotless

Signed-off-by: seungwon cho <[email protected]>

* addressed comments

Signed-off-by: seungwon cho <[email protected]>

* solve gradle build issue

Signed-off-by: seungwon cho <[email protected]>

* add test cases for create-api

Signed-off-by: seungwon cho <[email protected]>

* fix javadoc test case failure

Signed-off-by: seungwon cho <[email protected]>

* fix javadoc error

Signed-off-by: seungwon cho <[email protected]>

* fix guava noclass issue

Signed-off-by: seungwon cho <[email protected]>

* fix jacocoTestCoverageVerification fail

Signed-off-by: seungwon cho <[email protected]>

* improve test coverages

Signed-off-by: seungwon cho <[email protected]>

---------

Signed-off-by: zane-neo <[email protected]>
Signed-off-by: seungwon cho <[email protected]>
Co-authored-by: zane-neo <[email protected]>
akolarkunnu pushed a commit to akolarkunnu/ml-commons that referenced this pull request Jun 6, 2025
* support MCP session management

Signed-off-by: zane-neo <[email protected]>

* Addressing comments

Signed-off-by: zane-neo <[email protected]>

* add feature flag for mcp server and renaming mcp connector feature flag

Signed-off-by: zane-neo <[email protected]>

* Address critical comments in opensearch-project#3781

Signed-off-by: zane-neo <[email protected]>

---------

Signed-off-by: zane-neo <[email protected]>
Signed-off-by: Abdul Muneer Kolarkunnu <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants