-
Notifications
You must be signed in to change notification settings - Fork 165
support MCP session management #3803
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support MCP session management #3803
Conversation
Signed-off-by: zane-neo <[email protected]>
|
||
@Override | ||
public void handleException(TransportException e) { | ||
System.out.println("got exception:" + e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove debugging code ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@Override | ||
public void handleException(TransportException e) { | ||
System.out.println("got exception:" + e.getMessage()); | ||
log.error("got exception: ", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got exception
is too simple, Make error message more readable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
} catch (IOException ex) { | ||
log.error("Failed to send exception response to client during message handling due to IOException"); | ||
log.error("Failed to get the session management index result with sessionId: {}", sessionId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error message is misleading. Here the exception should be some failure when send exception response to client. I think the original exception make sense.
This error message should be added after link 235.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the error message when getIndex request failed, so we need to change the message here
|
||
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { | ||
Map<String, DiscoveryNode> nodes = new HashMap<>(clusterService.state().nodes().getNodes()); | ||
nodes.remove(clusterService.localNode().getId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why remove local node ? I see this transport action will be used in
if (clusterService.localNode().getId().equals(nodeId)) {
client
.execute(
MLMcpMessageAction.INSTANCE,
new MLMcpMessageRequest(sessionId, requestBody),
actionListener
);
Can you add some comments ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious about the same, I see that we are picking the first node all the time:
clusterService.state().nodes().getNodes().get(nodes.keySet().stream().findFirst().get()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are test code, removing
@Override | ||
protected void doExecute(Task task, ActionRequest request, ActionListener<AcknowledgedResponse> listener) { | ||
MLMcpMessageRequest mlMcpMessageRequest = MLMcpMessageRequest.fromActionRequest(request); | ||
final StreamingRestChannel channel = McpAsyncServerHolder.CHANNELS.get(mlMcpMessageRequest.getSessionId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use REST channel in transport action? Is this ok ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rest channels here represents persistent connections, it's ok to use them anywhere, but we need somewhere to close them, will add this logic.
@@ -21,6 +26,8 @@ public class McpAsyncServerHolder { | |||
new ObjectMapper() | |||
); | |||
|
|||
public static Map<String, StreamingRestChannel> CHANNELS = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the session will be removed ? I don't see remove logic
When/should we close the StreamingRestChannel ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the lifecycle of MCP protocol, we should close the channel when client disconnected, but I'm not sure if we have an approach to listen to this event. This doesn't have big impact for now so we can deprioritize this.
Have you addressed all comments from last PR #3781 ? #3781 (comment), I don't see setting to enable/disable this experimental feature Suggest add your comment on last PR if the comment already addressed in this PR |
} | ||
); | ||
if (clusterService.localNode().getId().equals(nodeId)) { | ||
client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this ? I see same code in else block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't see the same in else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you check the first commit, you can see such same code
client
.execute(
MLMcpMessageAction.INSTANCE,
new MLMcpMessageRequest(sessionId, requestBody),
actionListener
);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see you removed such code in the second commit for (clusterService.localNode().getId().equals(nodeId))
. I think such code not needed for local node case, right ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At first I thought you mean the clusterService.localNode().getId().equals(nodeId)
in else. I removed the part you mentioned, it's not needed.
String result = String.format("/sse/message?sessionId=%s", sessionId); | ||
return Mono.just(createHttpChunk(ENDPOINT_EVENT_TYPE, result)); | ||
ActionListener<IndexResponse> actionListener = ActionListener.wrap(r -> { | ||
if (r != null && r.status() == RestStatus.CREATED) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if the status is not CREATED
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, added else logic.
"node_id": { | ||
"type": "keyword" | ||
}, | ||
"status": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the expected values for status
? I see active
and created
being used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Values are: active and inactive. Now code has only active, created is the rest response status.
|
||
@Override | ||
public String executor() { | ||
return SAME; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious, what does this achieve?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which thread will be use to handle the transport response.
channel.sendResponse(new BytesRestResponse(channel, new Exception(e))); | ||
listener.onFailure(new Exception(e)); | ||
} catch (IOException ex) { | ||
log.error("Failed to send exception response to client during message handling due to IOException"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log the exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's logged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, i meant to log the actual exception log.error("...", ex)
anyways, it's a nit, can handle this later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
O, I mean I'll add this log, and I've logged in the comment addressing commit.
String nodeId = String.valueOf(r.getSourceAsMap().get("node_id")); | ||
DiscoveryNode node = clusterService.state().getNodes().getNodes().get(nodeId); | ||
if (node == null) { | ||
log.error("The node:{} is no longer in the current cluster, can not handle the mcp request", r.getId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be nodeId
and not r.getId()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed
log.debug("MCP request has been dispatched to corresponding node and handled successfully!"); | ||
if (requestBody.contains("notifications/initialized")) { | ||
log | ||
.debug( | ||
"Starting to send OK response for notifications/initialized request in coordinator node" | ||
); | ||
channel.sendChunk(createInitializedNotificationRes()); | ||
} | ||
} | ||
}, | ||
e -> { | ||
log | ||
.error( | ||
"MCP request has been dispatched to corresponding node but peer node failed to handle it", | ||
e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's log the nodeId here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved to unblock RC2. Please address all comments in new PR
CI failed
|
Signed-off-by: zane-neo <[email protected]>
Signed-off-by: zane-neo <[email protected]>
These comments are been address in this PR. |
@@ -92,26 +93,26 @@ public static McpTool parse(XContentParser parser) throws IOException { | |||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should these fields also be changed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which fields? Not following.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line 85-89
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
approve to unblock 3.0 RC2
* support MCP session management Signed-off-by: zane-neo <[email protected]> * Addressing comments Signed-off-by: zane-neo <[email protected]> * add feature flag for mcp server and renaming mcp connector feature flag Signed-off-by: zane-neo <[email protected]> * Address critical comments in #3781 Signed-off-by: zane-neo <[email protected]> --------- Signed-off-by: zane-neo <[email protected]> (cherry picked from commit 7c05295)
* support MCP session management * Addressing comments * add feature flag for mcp server and renaming mcp connector feature flag * Address critical comments in #3781 --------- (cherry picked from commit 7c05295) Signed-off-by: zane-neo <[email protected]> Co-authored-by: zane-neo <[email protected]>
* [BUG] Agent Framework: Handle model response when toolUse is not accompanied by text (#3755) * fix: handle model response when toolUse is not accompanied by text Signed-off-by: Pavan Yekbote <[email protected]> * feat: add test case for parseLLMOutput Signed-off-by: Pavan Yekbote <[email protected]> --------- Signed-off-by: Pavan Yekbote <[email protected]> * [BUG] Allow user to control react agent max_interations value to prevent empty response (#3756) * fix: expose max_iteration for react Signed-off-by: Pavan Yekbote <[email protected]> * fix: defaults for agent execution and differentiate between step and step result Signed-off-by: Pavan Yekbote <[email protected]> * fix: return react agent id in agent response to expose more details Signed-off-by: Pavan Yekbote <[email protected]> * spotless Signed-off-by: Pavan Yekbote <[email protected]> * fix: remove test prompt from react system prompt Signed-off-by: Pavan Yekbote <[email protected]> * refactor: rename parameters exposed to user to executor Signed-off-by: Pavan Yekbote <[email protected]> * fix: give user complete control over planner system prompt Signed-off-by: Pavan Yekbote <[email protected]> --------- Signed-off-by: Pavan Yekbote <[email protected]> * Clean up JSM from MCP (#3773) Signed-off-by: rithin-pullela-aws <[email protected]> * [Bug] ListTools call does not return tool attributes (#3785) * initial commit for MCP server in OpenSearch (#3781) * initial commit for MCP server in OpenSearch Signed-off-by: zane-neo <[email protected]> * Make change to support register or remove tools across cluster Signed-off-by: zane-neo <[email protected]> * format code Signed-off-by: zane-neo <[email protected]> * fix UT failure caused by code change Signed-off-by: zane-neo <[email protected]> * format code Signed-off-by: zane-neo <[email protected]> * format code Signed-off-by: zane-neo <[email protected]> * add license header Signed-off-by: zane-neo <[email protected]> * fix notifications initialized not respond issue Signed-off-by: zane-neo <[email protected]> * fix minor issues and add UTs Signed-off-by: zane-neo <[email protected]> * Add more UTs Signed-off-by: zane-neo <[email protected]> --------- Signed-off-by: zane-neo <[email protected]> * Remove beta1 qualifier (#3794) (#3795) (cherry picked from commit 3f503f1) Signed-off-by: Peter Zhu <[email protected]> Co-authored-by: Peter Zhu <[email protected]> * [AUTO] Increment version to 3.1.0-SNAPSHOT (#3789) * Increment version to 3.1.0-SNAPSHOT Signed-off-by: opensearch-ci-bot <[email protected]> * Update build.gradle Signed-off-by: Peter Zhu <[email protected]> --------- Signed-off-by: opensearch-ci-bot <[email protected]> Signed-off-by: Peter Zhu <[email protected]> Co-authored-by: opensearch-ci-bot <[email protected]> Co-authored-by: Peter Zhu <[email protected]> * add release note for 3.0 (#3792) Signed-off-by: Mingshi Liu <[email protected]> * support MCP session management (#3803) * support MCP session management Signed-off-by: zane-neo <[email protected]> * Addressing comments Signed-off-by: zane-neo <[email protected]> * add feature flag for mcp server and renaming mcp connector feature flag Signed-off-by: zane-neo <[email protected]> * Address critical comments in #3781 Signed-off-by: zane-neo <[email protected]> --------- Signed-off-by: zane-neo <[email protected]> * upgrade http client to version align with core (#3809) * upgrade http client to versoin align with core Signed-off-by: zane-neo <[email protected]> * upgrade httpclient-h2 to correct versiono Signed-off-by: zane-neo <[email protected]> * use placeholder approach Signed-off-by: zane-neo <[email protected]> --------- Signed-off-by: zane-neo <[email protected]> * support customized message endpoint and addressing comments (#3810) * support customized message endpoint and addressing comments Signed-off-by: zane-neo <[email protected]> * fix UT failures Signed-off-by: zane-neo <[email protected]> * add files to jacoco exception Signed-off-by: zane-neo <[email protected]> * fix tool name issue and optimize register tool api Signed-off-by: zane-neo <[email protected]> * fix schema not parsed correctly issue and NPE when parameters is null Signed-off-by: zane-neo <[email protected]> * fix failure UT Signed-off-by: zane-neo <[email protected]> --------- Signed-off-by: zane-neo <[email protected]> * excluding circuit breaker for Agent (#3814) Signed-off-by: Dhrubo Saha <[email protected]> * change release note (#3811) * change release note Signed-off-by: zane-neo <[email protected]> * Update opensearch-ml-common.release-notes-3.0.0.0.md * Update opensearch-ml-common.release-notes-3.0.0.0.md * Update opensearch-ml-common.release-notes-3.0.0.0.md --------- Signed-off-by: zane-neo <[email protected]> Co-authored-by: Peter Zhu <[email protected]> * Downgrade MCP version to 0.9 (#3821) Signed-off-by: rithin-pullela-aws <[email protected]> * remove libs folder (#3824) Signed-off-by: Yaliang Wu <[email protected]> * add more logging to deploy/undeploy flows for better debugging (#3825) * add more logging to deploy/undeploy flows for better debugging Signed-off-by: Bhavana Goud Ramaram <[email protected]> * Fix python client not able to connect to MCP server issue (#3822) Signed-off-by: zane-neo <[email protected]> Co-authored-by: Dhrubo Saha <[email protected]> * exclude trusted connector check for hidden model (#3838) Signed-off-by: Dhrubo Saha <[email protected]> * adding tenantId to the connector executor when this is inline connector (#3837) * adding tenantId to the connector executor when this is inline connector Signed-off-by: Dhrubo Saha <[email protected]> * added more unit tests Signed-off-by: Dhrubo Saha <[email protected]> --------- Signed-off-by: Dhrubo Saha <[email protected]> --------- Signed-off-by: Pavan Yekbote <[email protected]> Signed-off-by: rithin-pullela-aws <[email protected]> Signed-off-by: zane-neo <[email protected]> Signed-off-by: Peter Zhu <[email protected]> Signed-off-by: opensearch-ci-bot <[email protected]> Signed-off-by: Mingshi Liu <[email protected]> Signed-off-by: Dhrubo Saha <[email protected]> Signed-off-by: Yaliang Wu <[email protected]> Signed-off-by: Bhavana Goud Ramaram <[email protected]> Co-authored-by: Pavan Yekbote <[email protected]> Co-authored-by: Rithin Pullela <[email protected]> Co-authored-by: zane-neo <[email protected]> Co-authored-by: opensearch-trigger-bot[bot] <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Co-authored-by: Peter Zhu <[email protected]> Co-authored-by: opensearch-ci-bot <[email protected]> Co-authored-by: Mingshi Liu <[email protected]> Co-authored-by: Yaliang Wu <[email protected]> Co-authored-by: Bhavana Goud Ramaram <[email protected]>
* support MCP session management Signed-off-by: zane-neo <[email protected]> * Addressing comments Signed-off-by: zane-neo <[email protected]> * add feature flag for mcp server and renaming mcp connector feature flag Signed-off-by: zane-neo <[email protected]> * Address critical comments in opensearch-project#3781 Signed-off-by: zane-neo <[email protected]> --------- Signed-off-by: zane-neo <[email protected]>
* initial commit for MCP server in OpenSearch (#3781) * initial commit for MCP server in OpenSearch Signed-off-by: zane-neo <[email protected]> * Make change to support register or remove tools across cluster Signed-off-by: zane-neo <[email protected]> * format code Signed-off-by: zane-neo <[email protected]> * fix UT failure caused by code change Signed-off-by: zane-neo <[email protected]> * format code Signed-off-by: zane-neo <[email protected]> * format code Signed-off-by: zane-neo <[email protected]> * add license header Signed-off-by: zane-neo <[email protected]> * fix notifications initialized not respond issue Signed-off-by: zane-neo <[email protected]> * fix minor issues and add UTs Signed-off-by: zane-neo <[email protected]> * Add more UTs Signed-off-by: zane-neo <[email protected]> --------- Signed-off-by: zane-neo <[email protected]> * support MCP session management (#3803) * support MCP session management Signed-off-by: zane-neo <[email protected]> * Addressing comments Signed-off-by: zane-neo <[email protected]> * add feature flag for mcp server and renaming mcp connector feature flag Signed-off-by: zane-neo <[email protected]> * Address critical comments in #3781 Signed-off-by: zane-neo <[email protected]> --------- Signed-off-by: zane-neo <[email protected]> * Create prompt API & System Index for prompt Signed-off-by: seungwon cho <[email protected]> * spotless is applied & disabled wildcard import Signed-off-by: seungwon cho <[email protected]> * javadoc added & header added & minor code errors fixed Signed-off-by: seungwon cho <[email protected]> * Apply spotless Signed-off-by: seungwon cho <[email protected]> * apply spotless Signed-off-by: seungwon cho <[email protected]> * addressed comments Signed-off-by: seungwon cho <[email protected]> * apply spotless Signed-off-by: seungwon cho <[email protected]> * addresses comments Signed-off-by: seungwon cho <[email protected]> * apply spotless Signed-off-by: seungwon cho <[email protected]> * addressed comments Signed-off-by: seungwon cho <[email protected]> * solve gradle build issue Signed-off-by: seungwon cho <[email protected]> * add test cases for create-api Signed-off-by: seungwon cho <[email protected]> * fix javadoc test case failure Signed-off-by: seungwon cho <[email protected]> * fix javadoc error Signed-off-by: seungwon cho <[email protected]> * fix guava noclass issue Signed-off-by: seungwon cho <[email protected]> * fix jacocoTestCoverageVerification fail Signed-off-by: seungwon cho <[email protected]> * improve test coverages Signed-off-by: seungwon cho <[email protected]> --------- Signed-off-by: zane-neo <[email protected]> Signed-off-by: seungwon cho <[email protected]> Co-authored-by: zane-neo <[email protected]>
* support MCP session management Signed-off-by: zane-neo <[email protected]> * Addressing comments Signed-off-by: zane-neo <[email protected]> * add feature flag for mcp server and renaming mcp connector feature flag Signed-off-by: zane-neo <[email protected]> * Address critical comments in opensearch-project#3781 Signed-off-by: zane-neo <[email protected]> --------- Signed-off-by: zane-neo <[email protected]> Signed-off-by: Abdul Muneer Kolarkunnu <[email protected]>
Description
This PR is to support MCP session management to fix the issue in cluster case, the session could lose if client sends message to load balancer or random coordinators.
Related Issues
Resolves #[Issue number to be closed when this PR is merged]
Check List
--signoff
.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.