Skip to content

Commit 3deebaa

Browse files
committed
initial commit for MCP server in OpenSearch (opensearch-project#3781)
* initial commit for MCP server in OpenSearch Signed-off-by: zane-neo <[email protected]> * Make change to support register or remove tools across cluster Signed-off-by: zane-neo <[email protected]> * format code Signed-off-by: zane-neo <[email protected]> * fix UT failure caused by code change Signed-off-by: zane-neo <[email protected]> * format code Signed-off-by: zane-neo <[email protected]> * format code Signed-off-by: zane-neo <[email protected]> * add license header Signed-off-by: zane-neo <[email protected]> * fix notifications initialized not respond issue Signed-off-by: zane-neo <[email protected]> * fix minor issues and add UTs Signed-off-by: zane-neo <[email protected]> * Add more UTs Signed-off-by: zane-neo <[email protected]> --------- Signed-off-by: zane-neo <[email protected]> (cherry picked from commit 9b0294d)
1 parent 3f503f1 commit 3deebaa

File tree

41 files changed

+3581
-20
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+3581
-20
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.ml.common.transport.mcpserver.action;
7+
8+
import org.opensearch.action.ActionType;
9+
import org.opensearch.ml.common.transport.mcpserver.responses.register.MLMcpRegisterNodesResponse;
10+
11+
public class MLMcpToolsRegisterOnNodesAction extends ActionType<MLMcpRegisterNodesResponse> {
12+
public static MLMcpToolsRegisterOnNodesAction INSTANCE = new MLMcpToolsRegisterOnNodesAction();
13+
public static final String NAME = "cluster:admin/opensearch/ml/mcp_tools/register_on_nodes";
14+
15+
private MLMcpToolsRegisterOnNodesAction() {
16+
super(NAME, MLMcpRegisterNodesResponse::new);
17+
}
18+
19+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.ml.common.transport.mcpserver.action;
7+
8+
import org.opensearch.action.ActionType;
9+
import org.opensearch.ml.common.transport.mcpserver.responses.remove.MLMcpRemoveNodesResponse;
10+
11+
public class MLMcpToolsRemoveOnNodesAction extends ActionType<MLMcpRemoveNodesResponse> {
12+
public static MLMcpToolsRemoveOnNodesAction INSTANCE = new MLMcpToolsRemoveOnNodesAction();
13+
public static final String NAME = "cluster:admin/opensearch/ml/mcp_tools/remove_on_nodes";
14+
15+
private MLMcpToolsRemoveOnNodesAction() {
16+
super(NAME, MLMcpRemoveNodesResponse::new);
17+
}
18+
19+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.ml.common.transport.mcpserver.requests.register;
7+
8+
import java.io.ByteArrayInputStream;
9+
import java.io.ByteArrayOutputStream;
10+
import java.io.IOException;
11+
import java.io.UncheckedIOException;
12+
13+
import org.opensearch.action.ActionRequest;
14+
import org.opensearch.action.ActionRequestValidationException;
15+
import org.opensearch.core.common.io.stream.InputStreamStreamInput;
16+
import org.opensearch.core.common.io.stream.OutputStreamStreamOutput;
17+
import org.opensearch.core.common.io.stream.StreamInput;
18+
import org.opensearch.core.common.io.stream.StreamOutput;
19+
import org.opensearch.transport.TransportRequest;
20+
21+
import lombok.Builder;
22+
import lombok.Data;
23+
24+
@Data
25+
public class MLMcpToolsRegisterNodeRequest extends ActionRequest {
26+
private McpTools mcpTools;
27+
28+
public MLMcpToolsRegisterNodeRequest(StreamInput in) throws IOException {
29+
super(in);
30+
this.mcpTools = new McpTools(in);
31+
}
32+
33+
@Override
34+
public ActionRequestValidationException validate() {
35+
return null;
36+
}
37+
38+
@Builder
39+
public MLMcpToolsRegisterNodeRequest(McpTools mcpTools) {
40+
this.mcpTools = mcpTools;
41+
}
42+
43+
@Override
44+
public void writeTo(StreamOutput out) throws IOException {
45+
super.writeTo(out);
46+
mcpTools.writeTo(out);
47+
}
48+
49+
public static MLMcpToolsRegisterNodeRequest fromActionRequest(TransportRequest actionRequest) {
50+
if (actionRequest instanceof MLMcpToolsRegisterNodeRequest) {
51+
return (MLMcpToolsRegisterNodeRequest) actionRequest;
52+
}
53+
54+
try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); OutputStreamStreamOutput osso = new OutputStreamStreamOutput(baos)) {
55+
actionRequest.writeTo(osso);
56+
try (StreamInput input = new InputStreamStreamInput(new ByteArrayInputStream(baos.toByteArray()))) {
57+
return new MLMcpToolsRegisterNodeRequest(input);
58+
}
59+
} catch (IOException e) {
60+
throw new UncheckedIOException("Failed to parse ActionRequest into MLMcpToolsRegisterNodeRequest", e);
61+
}
62+
}
63+
64+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package org.opensearch.ml.common.transport.mcpserver.requests.register;
2+
3+
import java.io.ByteArrayInputStream;
4+
import java.io.ByteArrayOutputStream;
5+
import java.io.IOException;
6+
import java.io.UncheckedIOException;
7+
8+
import org.opensearch.action.ActionRequest;
9+
import org.opensearch.action.ActionRequestValidationException;
10+
import org.opensearch.action.support.nodes.BaseNodesRequest;
11+
import org.opensearch.core.common.io.stream.InputStreamStreamInput;
12+
import org.opensearch.core.common.io.stream.OutputStreamStreamOutput;
13+
import org.opensearch.core.common.io.stream.StreamInput;
14+
import org.opensearch.core.common.io.stream.StreamOutput;
15+
import org.opensearch.core.common.util.CollectionUtils;
16+
17+
import lombok.Data;
18+
19+
@Data
20+
public class MLMcpToolsRegisterNodesRequest extends BaseNodesRequest<MLMcpToolsRegisterNodesRequest> {
21+
private McpTools mcpTools;
22+
23+
public MLMcpToolsRegisterNodesRequest(StreamInput in) throws IOException {
24+
super(in);
25+
this.mcpTools = new McpTools(in);
26+
}
27+
28+
public MLMcpToolsRegisterNodesRequest(String[] nodeIds, McpTools mcpTools) {
29+
super(nodeIds);
30+
this.mcpTools = mcpTools;
31+
}
32+
33+
public void writeTo(StreamOutput out) throws IOException {
34+
super.writeTo(out);
35+
mcpTools.writeTo(out);
36+
}
37+
38+
@Override
39+
public ActionRequestValidationException validate() {
40+
if (CollectionUtils.isEmpty(mcpTools.getTools())) {
41+
ActionRequestValidationException exception = new ActionRequestValidationException();
42+
exception.addValidationError("tools list can not be null");
43+
return exception;
44+
}
45+
return null;
46+
}
47+
48+
public static MLMcpToolsRegisterNodesRequest fromActionRequest(ActionRequest actionRequest) {
49+
if (actionRequest instanceof MLMcpToolsRegisterNodesRequest) {
50+
return (MLMcpToolsRegisterNodesRequest) actionRequest;
51+
}
52+
53+
try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); OutputStreamStreamOutput osso = new OutputStreamStreamOutput(baos)) {
54+
actionRequest.writeTo(osso);
55+
try (StreamInput input = new InputStreamStreamInput(new ByteArrayInputStream(baos.toByteArray()))) {
56+
return new MLMcpToolsRegisterNodesRequest(input);
57+
}
58+
} catch (IOException e) {
59+
throw new UncheckedIOException("Failed to parse ActionRequest into MLMcpToolsRegisterRequest", e);
60+
}
61+
}
62+
}
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
*
3+
* * Copyright OpenSearch Contributors
4+
* * SPDX-License-Identifier: Apache-2.0
5+
*
6+
*/
7+
8+
package org.opensearch.ml.common.transport.mcpserver.requests.register;
9+
10+
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
11+
12+
import java.io.IOException;
13+
import java.util.Map;
14+
15+
import org.opensearch.core.common.io.stream.StreamInput;
16+
import org.opensearch.core.common.io.stream.StreamOutput;
17+
import org.opensearch.core.common.io.stream.Writeable;
18+
import org.opensearch.core.xcontent.ToXContent;
19+
import org.opensearch.core.xcontent.ToXContentObject;
20+
import org.opensearch.core.xcontent.XContentBuilder;
21+
import org.opensearch.core.xcontent.XContentParser;
22+
23+
import lombok.Data;
24+
import lombok.extern.log4j.Log4j2;
25+
26+
/**
27+
* This class represents a tool that can be registered with OpenSearch. It contains information about the tool's name,
28+
* description, parameters, and schema.
29+
*/
30+
@Log4j2
31+
@Data
32+
public class McpTool implements ToXContentObject, Writeable {
33+
private static final String NAME_FIELD = "name";
34+
private static final String DESCRIPTION_FIELD = "description";
35+
private static final String PARAMS_FIELD = "params";
36+
private static final String SCHEMA_FIELD = "schema";
37+
private final String name;
38+
private final String description;
39+
private Map<String, Object> params;
40+
private Map<String, Object> schema;
41+
private static final String nameNotShownExceptionMessage = "name field required";
42+
43+
public McpTool(StreamInput streamInput) throws IOException {
44+
name = streamInput.readString();
45+
if (name == null) {
46+
throw new IllegalArgumentException(nameNotShownExceptionMessage);
47+
}
48+
description = streamInput.readOptionalString();
49+
if (streamInput.readBoolean()) {
50+
params = streamInput.readMap(StreamInput::readString, StreamInput::readGenericValue);
51+
}
52+
if (streamInput.readBoolean()) {
53+
schema = streamInput.readMap(StreamInput::readString, StreamInput::readGenericValue);
54+
}
55+
}
56+
57+
public McpTool(String name, String description, Map<String, Object> params, Map<String, Object> schema) {
58+
if (name == null) {
59+
throw new IllegalArgumentException(nameNotShownExceptionMessage);
60+
}
61+
this.name = name;
62+
this.description = description;
63+
this.params = params;
64+
this.schema = schema;
65+
}
66+
67+
public static McpTool parse(XContentParser parser) throws IOException {
68+
String name = null;
69+
String description = null;
70+
Map<String, Object> params = null;
71+
Map<String, Object> schema = null;
72+
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
73+
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
74+
String fieldName = parser.currentName();
75+
parser.nextToken();
76+
77+
switch (fieldName) {
78+
case NAME_FIELD:
79+
name = parser.text();
80+
break;
81+
case DESCRIPTION_FIELD:
82+
description = parser.text();
83+
break;
84+
case PARAMS_FIELD:
85+
params = parser.map();
86+
break;
87+
case SCHEMA_FIELD:
88+
schema = parser.map();
89+
break;
90+
default:
91+
parser.skipChildren();
92+
break;
93+
}
94+
}
95+
if (name == null) {
96+
throw new IllegalArgumentException(nameNotShownExceptionMessage);
97+
}
98+
return new McpTool(name, description, params, schema);
99+
}
100+
101+
@Override
102+
public void writeTo(StreamOutput streamOutput) throws IOException {
103+
streamOutput.writeString(name);
104+
streamOutput.writeOptionalString(description);
105+
if (params != null) {
106+
streamOutput.writeBoolean(true);
107+
streamOutput.writeMap(params, StreamOutput::writeString, StreamOutput::writeGenericValue);
108+
} else {
109+
streamOutput.writeBoolean(false);
110+
}
111+
112+
if (schema != null) {
113+
streamOutput.writeBoolean(true);
114+
streamOutput.writeMap(schema, StreamOutput::writeString, StreamOutput::writeGenericValue);
115+
} else {
116+
streamOutput.writeBoolean(false);
117+
}
118+
}
119+
120+
@Override
121+
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params xcontentParams) throws IOException {
122+
builder.startObject();
123+
builder.field(NAME_FIELD, name);
124+
if (description != null) {
125+
builder.field(DESCRIPTION_FIELD, description);
126+
}
127+
if (params != null && !params.isEmpty()) {
128+
builder.field(PARAMS_FIELD, params);
129+
}
130+
if (schema != null && !schema.isEmpty()) {
131+
builder.field(SCHEMA_FIELD, schema);
132+
}
133+
builder.endObject();
134+
return builder;
135+
}
136+
}

0 commit comments

Comments
 (0)