Skip to content

[Bug fixes] Fix ListIndexTool and SearchIndexTool #3720

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Apr 11, 2025
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ public MLToolSpec(StreamInput input) throws IOException {
configMap = input.readMap(StreamInput::readString, StreamInput::readOptionalString);
}
this.tenantId = streamInputVersion.onOrAfter(VERSION_2_19_0) ? input.readOptionalString() : null;
if (input.available() > 0 && input.readBoolean()) {
parameters = input.readMap(StreamInput::readString, StreamInput::readOptionalString);
if (input.getVersion().onOrAfter(VERSION_3_0_0) && input.available() > 0 && input.readBoolean()) {
attributes = input.readMap(StreamInput::readString, StreamInput::readOptionalString);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it the attributes and parameters are reading the same map? what is the intention?

    if (input.readBoolean()) {
        parameters = input.readMap(StreamInput::readString, StreamInput::readOptionalString);
    }
    includeOutputInAgentResponse = input.readBoolean();
    if (input.getVersion().onOrAfter(MINIMAL_SUPPORTED_VERSION_FOR_TOOL_CONFIG) && input.readBoolean()) {
        configMap = input.readMap(StreamInput::readString, StreamInput::readOptionalString);
    }
    this.tenantId = streamInputVersion.onOrAfter(VERSION_2_19_0) ? input.readOptionalString() : null;
    if (input.available() > 0 && input.readBoolean()) {
        attributes = input.readMap(StreamInput::readString, StreamInput::readOptionalString);
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

earlier, parameters was being overwritten by attributes map as we were reassigning parameters (bug)

now, we are assigning it correctly to attributes

attributes map is the last item in the bytestream, readMap() is a method that allows populating a map from the bytestream

Copy link
Contributor Author

@pyek-bot pyek-bot Apr 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am noticing some weird behavior in a multi-node cluster, will validate and get back

***resolved, the issue was with reading attributes twice, this code is still valid

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ private static void runTool(
tools.get(action).run(parameters, toolListener); // run tool
}
} catch (Exception e) {
log.error("Failed to run tool {}", action, e);
Copy link
Collaborator

@mingshl mingshl Apr 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the next line, the error message will be return to the listener, and it's going to be the same e.getMessage(). is the log needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

listener behavior might change over time and may not expose these details, so i think its better to explicitly log tool failure here irrespective of listener

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log is for debugging if later we need to deep dive to identify the root cause. And listener is for customer to be notified at that very moment.

nextStepListener
.onResponse(String.format(Locale.ROOT, "Failed to run the tool %s with the error message %s.", action, e.getMessage()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
import static org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest.DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT;
import static org.opensearch.ml.common.utils.StringUtils.gson;

import java.util.Collections;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.util.Strings;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
Expand All @@ -34,6 +35,8 @@
@ToolAnnotation(IndexMappingTool.TYPE)
public class IndexMappingTool implements Tool {
public static final String TYPE = "IndexMappingTool";
public static final String INPUT_SCHEMA_FIELD = "input_schema";
public static final String STRICT_FIELD = "strict";
private static final String DEFAULT_DESCRIPTION = String
.join(
" ",
Expand All @@ -44,6 +47,12 @@ public class IndexMappingTool implements Tool {
"The mappings are in JSON format under the key 'properties' which includes the field name as a key and a JSON object with field type under the key 'type'.",
"The settings are in flattened map with 'index' as the top element and key-value pairs for each setting."
);
public static final String DEFAULT_INPUT_SCHEMA = "{\"type\":\"object\",\""
+ "properties\":{\"index\":{\"type\":\"array\",\"description\":\"OpenSearch index name list, separated by comma. "
+ "for example: [\\\"index1\\\", \\\"index2\\\"]\","
+ "\"items\":{\"type\":\"string\"}}},"
+ "\"required\":[\"index\"],"
+ "\"additionalProperties\":false}";

@Setter
@Getter
Expand All @@ -67,12 +76,8 @@ public IndexMappingTool(Client client) {
this.client = client;

this.attributes = new HashMap<>();
attributes
.put(
"input_schema",
"{\"type\":\"object\",\"properties\":{\"index\":{\"type\":\"string\",\"description\":\"OpenSearch index name\"}},\"required\":[\"index\"],\"additionalProperties\":false}"
);
attributes.put("strict", true);
attributes.put(INPUT_SCHEMA_FIELD, DEFAULT_INPUT_SCHEMA);
attributes.put(STRICT_FIELD, true);

outputParser = new Parser<>() {
@Override
Expand All @@ -86,75 +91,80 @@ public Object parse(Object o) {

@Override
public <T> void run(Map<String, String> parameters, ActionListener<T> listener) {
@SuppressWarnings("unchecked")
List<String> indexList = parameters.containsKey("index")
? gson.fromJson(parameters.get("index"), List.class)
: Collections.emptyList();
if (indexList.isEmpty()) {
@SuppressWarnings("unchecked")
T empty = (T) ("There were no results searching the index parameter [" + parameters.get("index") + "].");
listener.onResponse(empty);
return;
}
try {
List<String> indexList = new ArrayList<>();
if (StringUtils.isNotBlank(parameters.get("index"))) {
indexList = gson.fromJson(parameters.get("index"), List.class);
}

final String[] indices = indexList.toArray(Strings.EMPTY_ARRAY);
if (indexList.isEmpty()) {
@SuppressWarnings("unchecked")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are just trying to returnning a string on listener response? it's not recommended to supresswarning, try compose the string in a better way.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but I don't have the context on this listener, when it's empty index list, should it return empty list or this string?

try this?

if (indexList.isEmpty()) {
    String message = String.format("There were no results searching the index parameter [%s].", parameters.get("index"));
    listener.onResponse(createResponse(message));
    return;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

most of it is existing code that has been moved (see above), i lack the complete context on why it was built this way tbh

so the tools are executed using input provided by a model - these are always going to be strings and the response is also passed as a string to the model, therefore it is safe to work with the assumption its a string

however, there is concern if at any point this object changes, i will try to see if it can be handled in a better way

Copy link
Collaborator

@ylwu-amzn ylwu-amzn Apr 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mingshl , This is to return the correct type T as you can see line 89 actionListener<T> listener

The actionListener expects type T , not String type. I would suggest have a separate PR to enhance this part. No need to fix all issues in this PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pyek-bot , let's create issue tracking the issue and fix in new PR ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, created issue: #3722

T empty = (T) ("There were no results searching the index parameter [" + parameters.get("index") + "].");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do you know this guarantee that T is actually String. Otherwise this will cause a ClassCastException at runtime.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is existing code. But I think we can enhance the logic as we already know the indexList is empty

listener.onResponse(empty);
return;
}

final IndicesOptions indicesOptions = IndicesOptions.strictExpand();
final boolean local = Boolean.parseBoolean(parameters.get("local"));
final TimeValue clusterManagerNodeTimeout = DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT;
final String[] indices = indexList.toArray(Strings.EMPTY_ARRAY);

ActionListener<GetIndexResponse> internalListener = new ActionListener<GetIndexResponse>() {
final IndicesOptions indicesOptions = IndicesOptions.strictExpand();
final boolean local = Boolean.parseBoolean(parameters.getOrDefault("local", "false"));
final TimeValue clusterManagerNodeTimeout = DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT;

@Override
public void onResponse(GetIndexResponse getIndexResponse) {
try {
// Handle empty response
if (getIndexResponse.indices().length == 0) {
@SuppressWarnings("unchecked")
T empty = (T) ("There were no results searching the index parameter [" + parameters.get("index") + "].");
listener.onResponse(empty);
return;
}
StringBuilder sb = new StringBuilder();
for (String index : getIndexResponse.indices()) {
sb.append("index: ").append(index).append("\n\n");

MappingMetadata mapping = getIndexResponse.mappings().get(index);
if (mapping != null) {
sb.append("mappings:\n");
for (Entry<String, Object> entry : mapping.sourceAsMap().entrySet()) {
sb.append(entry.getKey()).append("=").append(entry.getValue()).append('\n');
}
sb.append("\n\n");
ActionListener<GetIndexResponse> internalListener = new ActionListener<GetIndexResponse>() {

@Override
public void onResponse(GetIndexResponse getIndexResponse) {
try {
// Handle empty response
if (getIndexResponse.indices().length == 0) {
@SuppressWarnings("unchecked")
T empty = (T) ("There were no results searching the index parameter [" + parameters.get("index") + "].");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, can you avoid this manual suppresswarning

listener.onResponse(empty);
return;
}
StringBuilder sb = new StringBuilder();
for (String index : getIndexResponse.indices()) {
sb.append("index: ").append(index).append("\n\n");

MappingMetadata mapping = getIndexResponse.mappings().get(index);
if (mapping != null) {
sb.append("mappings:\n");
for (Entry<String, Object> entry : mapping.sourceAsMap().entrySet()) {
sb.append(entry.getKey()).append("=").append(entry.getValue()).append('\n');
}
sb.append("\n\n");
}

Settings settings = getIndexResponse.settings().get(index);
if (settings != null) {
sb.append("settings:\n").append(settings.toDelimitedString('\n')).append("\n\n");
Settings settings = getIndexResponse.settings().get(index);
if (settings != null) {
sb.append("settings:\n").append(settings.toDelimitedString('\n')).append("\n\n");
}
}
}

@SuppressWarnings("unchecked")
T response = (T) sb.toString();
listener.onResponse(response);
} catch (Exception e) {
onFailure(e);
@SuppressWarnings("unchecked")
T response = (T) sb.toString();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe using the StringUtils.toString(), which takes in an object will help in this case?

listener.onResponse(response);
} catch (Exception e) {
onFailure(e);
}
}
}

@Override
public void onFailure(final Exception e) {
listener.onFailure(e);
}
@Override
public void onFailure(final Exception e) {
listener.onFailure(e);
}

};
final GetIndexRequest getIndexRequest = new GetIndexRequest()
.indices(indices)
.indicesOptions(indicesOptions)
.local(local)
.clusterManagerNodeTimeout(clusterManagerNodeTimeout);
};
final GetIndexRequest getIndexRequest = new GetIndexRequest()
.indices(indices)
.indicesOptions(indicesOptions)
.local(local)
.clusterManagerNodeTimeout(clusterManagerNodeTimeout);

client.admin().indices().getIndex(getIndexRequest, internalListener);
client.admin().indices().getIndex(getIndexRequest, internalListener);
} catch (Exception e) {
listener.onFailure(e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest.DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT;
import static org.opensearch.ml.common.utils.StringUtils.gson;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -23,6 +24,7 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.logging.log4j.util.Strings;
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
Expand Down Expand Up @@ -65,17 +67,24 @@
@ToolAnnotation(ListIndexTool.TYPE)
public class ListIndexTool implements Tool {
public static final String TYPE = "ListIndexTool";
public static final String INPUT_SCHEMA_FIELD = "input_schema";
public static final String STRICT_FIELD = "strict";
// This needs to be changed once it's changed in opensearch core in RestIndicesListAction.
private static final int MAX_SUPPORTED_LIST_INDICES_PAGE_SIZE = 5000;
public static final int DEFAULT_PAGE_SIZE = 100;
private static final String DEFAULT_DESCRIPTION = String
public static final String DEFAULT_DESCRIPTION = String
.join(
" ",
"This tool gets index information from the OpenSearch cluster.",
"It takes 2 optional arguments named `index` which is a comma-delimited list of one or more indices to get information from (default is an empty list meaning all indices),",
"It takes 2 optional arguments named `indices` which is a comma-delimited list of one or more indices to get information from (default is an empty list meaning all indices),",
"and `local` which means whether to return information from the local node only instead of the cluster manager node (default is false).",
"The tool returns the indices information, including `health`, `status`, `index`, `uuid`, `pri`, `rep`, `docs.count`, `docs.deleted`, `store.size`, `pri.store. size `, `pri.store.size`, `pri.store`."
);
public static final String DEFAULT_INPUT_SCHEMA = "{\"type\":\"object\","
+ "\"properties\":{\"indices\":{\"type\":\"array\",\"items\": {\"type\": \"string\"},"
+ "\"description\":\"OpenSearch index name list, separated by comma. "
+ "for example: [\\\"index1\\\", \\\"index2\\\"], use empty array [] to list all indices in the cluster\"}},"
+ "\"additionalProperties\":false}";

@Setter
@Getter
Expand Down Expand Up @@ -111,62 +120,67 @@ public Object parse(Object o) {
};

this.attributes = new HashMap<>();
attributes
.put(
"input_schema",
"{\"type\":\"object\",\"properties\":{\"indices\":{\"type\":\"string\",\"description\":\"OpenSearch index name list, separated by comma. for example: index1, index2\"}},\"additionalProperties\":false}"
);
attributes.put("strict", false);
attributes.put(INPUT_SCHEMA_FIELD, DEFAULT_INPUT_SCHEMA);
attributes.put(STRICT_FIELD, false);
}

@Override
public <T> void run(Map<String, String> parameters, ActionListener<T> listener) {
// TODO: This logic exactly matches the OpenSearch _list/indices REST action. If code at
// o.o.rest/action/list/RestIndicesListAction.java changes those changes need to be reflected here
@SuppressWarnings("unchecked")
List<String> indexList = parameters.containsKey("indices")
? gson.fromJson(parameters.get("indices"), List.class)
: Collections.emptyList();
final String[] indices = indexList.toArray(Strings.EMPTY_ARRAY);

final IndicesOptions indicesOptions = IndicesOptions.strictExpand();
final boolean local = parameters.containsKey("local") && Boolean.parseBoolean(parameters.get("local"));
final boolean includeUnloadedSegments = Boolean.parseBoolean(parameters.get("include_unloaded_segments"));
final int pageSize = parameters.containsKey("page_size")
? NumberUtils.toInt(parameters.get("page_size"), DEFAULT_PAGE_SIZE)
: DEFAULT_PAGE_SIZE;
final PageParams pageParams = new PageParams(null, PageParams.PARAM_ASC_SORT_VALUE, pageSize);

final ActionListener<Table> internalListener = ActionListener.notifyOnce(ActionListener.wrap(table -> {
// Handle empty table
if (table == null || table.getRows().isEmpty()) {
@SuppressWarnings("unchecked")
T empty = (T) ("There were no results searching the indices parameter [" + parameters.get("indices") + "].");
listener.onResponse(empty);
return;
try {
List<String> indexList = new ArrayList<>();
if (StringUtils.isNotBlank(parameters.get("indices"))) {
indexList = parameters.containsKey("indices")
? gson.fromJson(parameters.get("indices"), List.class)
: Collections.emptyList();
}
StringBuilder sb = new StringBuilder(
// Currently using c.value which is short header matching _cat/indices
// May prefer to use c.attr.get("desc") for full description
table.getHeaders().stream().map(c -> c.value.toString()).collect(Collectors.joining(",", "", "\n"))
final String[] indices = indexList.toArray(Strings.EMPTY_ARRAY);

final IndicesOptions indicesOptions = IndicesOptions.strictExpand();
final boolean local = parameters.containsKey("local") && Boolean.parseBoolean(parameters.get("local"));
final boolean includeUnloadedSegments = Boolean.parseBoolean(parameters.get("include_unloaded_segments"));
final int pageSize = parameters.containsKey("page_size")
? NumberUtils.toInt(parameters.get("page_size"), DEFAULT_PAGE_SIZE)
: DEFAULT_PAGE_SIZE;
final PageParams pageParams = new PageParams(null, PageParams.PARAM_ASC_SORT_VALUE, pageSize);

final ActionListener<Table> internalListener = ActionListener.notifyOnce(ActionListener.wrap(table -> {
// Handle empty table
if (table == null || table.getRows().isEmpty()) {
@SuppressWarnings("unchecked")
T empty = (T) ("There were no results searching the indices parameter [" + parameters.get("indices") + "].");
listener.onResponse(empty);
return;
}
StringBuilder sb = new StringBuilder(
// Currently using c.value which is short header matching _cat/indices
// May prefer to use c.attr.get("desc") for full description
table.getHeaders().stream().map(c -> c.value.toString()).collect(Collectors.joining(",", "", "\n"))
);
for (List<Cell> row : table.getRows()) {
sb
.append(
row.stream().map(c -> c.value == null ? null : c.value.toString()).collect(Collectors.joining(",", "", "\n"))
);
}
@SuppressWarnings("unchecked")
T response = (T) sb.toString();
listener.onResponse(response);
}, listener::onFailure));

fetchClusterInfoAndPages(
indices,
local,
includeUnloadedSegments,
pageParams,
indicesOptions,
new ConcurrentLinkedQueue<>(),
internalListener
);
for (List<Cell> row : table.getRows()) {
sb.append(row.stream().map(c -> c.value == null ? null : c.value.toString()).collect(Collectors.joining(",", "", "\n")));
}
@SuppressWarnings("unchecked")
T response = (T) sb.toString();
listener.onResponse(response);
}, listener::onFailure));

fetchClusterInfoAndPages(
indices,
local,
includeUnloadedSegments,
pageParams,
indicesOptions,
new ConcurrentLinkedQueue<>(),
internalListener
);
} catch (Exception e) {
listener.onFailure(e);
}
}

private void fetchClusterInfoAndPages(
Expand Down
Loading
Loading