|
8 | 8 | import static org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest.DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT;
|
9 | 9 | import static org.opensearch.ml.common.utils.StringUtils.gson;
|
10 | 10 |
|
| 11 | +import java.util.ArrayList; |
11 | 12 | import java.util.Collection;
|
12 | 13 | import java.util.Collections;
|
13 | 14 | import java.util.HashMap;
|
|
23 | 24 | import java.util.stream.Collectors;
|
24 | 25 | import java.util.stream.StreamSupport;
|
25 | 26 |
|
| 27 | +import org.apache.commons.lang3.StringUtils; |
26 | 28 | import org.apache.commons.lang3.math.NumberUtils;
|
27 | 29 | import org.apache.logging.log4j.util.Strings;
|
28 | 30 | import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
|
|
65 | 67 | @ToolAnnotation(ListIndexTool.TYPE)
|
66 | 68 | public class ListIndexTool implements Tool {
|
67 | 69 | public static final String TYPE = "ListIndexTool";
|
| 70 | + public static final String INPUT_SCHEMA_FIELD = "input_schema"; |
| 71 | + public static final String STRICT_FIELD = "strict"; |
68 | 72 | // This needs to be changed once it's changed in opensearch core in RestIndicesListAction.
|
69 | 73 | private static final int MAX_SUPPORTED_LIST_INDICES_PAGE_SIZE = 5000;
|
70 | 74 | public static final int DEFAULT_PAGE_SIZE = 100;
|
71 |
| - private static final String DEFAULT_DESCRIPTION = String |
| 75 | + public static final String DEFAULT_DESCRIPTION = String |
72 | 76 | .join(
|
73 | 77 | " ",
|
74 | 78 | "This tool gets index information from the OpenSearch cluster.",
|
75 |
| - "It takes 2 optional arguments named `index` which is a comma-delimited list of one or more indices to get information from (default is an empty list meaning all indices),", |
| 79 | + "It takes 2 optional arguments named `indices` which is a comma-delimited list of one or more indices to get information from (default is an empty list meaning all indices),", |
76 | 80 | "and `local` which means whether to return information from the local node only instead of the cluster manager node (default is false).",
|
77 | 81 | "The tool returns the indices information, including `health`, `status`, `index`, `uuid`, `pri`, `rep`, `docs.count`, `docs.deleted`, `store.size`, `pri.store. size `, `pri.store.size`, `pri.store`."
|
78 | 82 | );
|
| 83 | + public static final String DEFAULT_INPUT_SCHEMA = "{\"type\":\"object\"," |
| 84 | + + "\"properties\":{\"indices\":{\"type\":\"array\",\"items\": {\"type\": \"string\"}," |
| 85 | + + "\"description\":\"OpenSearch index name list, separated by comma. " |
| 86 | + + "for example: [\\\"index1\\\", \\\"index2\\\"], use empty array [] to list all indices in the cluster\"}}," |
| 87 | + + "\"additionalProperties\":false}"; |
79 | 88 |
|
80 | 89 | @Setter
|
81 | 90 | @Getter
|
@@ -111,62 +120,67 @@ public Object parse(Object o) {
|
111 | 120 | };
|
112 | 121 |
|
113 | 122 | this.attributes = new HashMap<>();
|
114 |
| - attributes |
115 |
| - .put( |
116 |
| - "input_schema", |
117 |
| - "{\"type\":\"object\",\"properties\":{\"indices\":{\"type\":\"string\",\"description\":\"OpenSearch index name list, separated by comma. for example: index1, index2\"}},\"additionalProperties\":false}" |
118 |
| - ); |
119 |
| - attributes.put("strict", false); |
| 123 | + attributes.put(INPUT_SCHEMA_FIELD, DEFAULT_INPUT_SCHEMA); |
| 124 | + attributes.put(STRICT_FIELD, false); |
120 | 125 | }
|
121 | 126 |
|
122 | 127 | @Override
|
123 | 128 | public <T> void run(Map<String, String> parameters, ActionListener<T> listener) {
|
124 | 129 | // TODO: This logic exactly matches the OpenSearch _list/indices REST action. If code at
|
125 | 130 | // o.o.rest/action/list/RestIndicesListAction.java changes those changes need to be reflected here
|
126 |
| - @SuppressWarnings("unchecked") |
127 |
| - List<String> indexList = parameters.containsKey("indices") |
128 |
| - ? gson.fromJson(parameters.get("indices"), List.class) |
129 |
| - : Collections.emptyList(); |
130 |
| - final String[] indices = indexList.toArray(Strings.EMPTY_ARRAY); |
131 |
| - |
132 |
| - final IndicesOptions indicesOptions = IndicesOptions.strictExpand(); |
133 |
| - final boolean local = parameters.containsKey("local") && Boolean.parseBoolean(parameters.get("local")); |
134 |
| - final boolean includeUnloadedSegments = Boolean.parseBoolean(parameters.get("include_unloaded_segments")); |
135 |
| - final int pageSize = parameters.containsKey("page_size") |
136 |
| - ? NumberUtils.toInt(parameters.get("page_size"), DEFAULT_PAGE_SIZE) |
137 |
| - : DEFAULT_PAGE_SIZE; |
138 |
| - final PageParams pageParams = new PageParams(null, PageParams.PARAM_ASC_SORT_VALUE, pageSize); |
139 |
| - |
140 |
| - final ActionListener<Table> internalListener = ActionListener.notifyOnce(ActionListener.wrap(table -> { |
141 |
| - // Handle empty table |
142 |
| - if (table == null || table.getRows().isEmpty()) { |
143 |
| - @SuppressWarnings("unchecked") |
144 |
| - T empty = (T) ("There were no results searching the indices parameter [" + parameters.get("indices") + "]."); |
145 |
| - listener.onResponse(empty); |
146 |
| - return; |
| 131 | + try { |
| 132 | + List<String> indexList = new ArrayList<>(); |
| 133 | + if (StringUtils.isNotBlank(parameters.get("indices"))) { |
| 134 | + indexList = parameters.containsKey("indices") |
| 135 | + ? gson.fromJson(parameters.get("indices"), List.class) |
| 136 | + : Collections.emptyList(); |
147 | 137 | }
|
148 |
| - StringBuilder sb = new StringBuilder( |
149 |
| - // Currently using c.value which is short header matching _cat/indices |
150 |
| - // May prefer to use c.attr.get("desc") for full description |
151 |
| - table.getHeaders().stream().map(c -> c.value.toString()).collect(Collectors.joining(",", "", "\n")) |
| 138 | + final String[] indices = indexList.toArray(Strings.EMPTY_ARRAY); |
| 139 | + |
| 140 | + final IndicesOptions indicesOptions = IndicesOptions.strictExpand(); |
| 141 | + final boolean local = parameters.containsKey("local") && Boolean.parseBoolean(parameters.get("local")); |
| 142 | + final boolean includeUnloadedSegments = Boolean.parseBoolean(parameters.get("include_unloaded_segments")); |
| 143 | + final int pageSize = parameters.containsKey("page_size") |
| 144 | + ? NumberUtils.toInt(parameters.get("page_size"), DEFAULT_PAGE_SIZE) |
| 145 | + : DEFAULT_PAGE_SIZE; |
| 146 | + final PageParams pageParams = new PageParams(null, PageParams.PARAM_ASC_SORT_VALUE, pageSize); |
| 147 | + |
| 148 | + final ActionListener<Table> internalListener = ActionListener.notifyOnce(ActionListener.wrap(table -> { |
| 149 | + // Handle empty table |
| 150 | + if (table == null || table.getRows().isEmpty()) { |
| 151 | + @SuppressWarnings("unchecked") |
| 152 | + T empty = (T) ("There were no results searching the indices parameter [" + parameters.get("indices") + "]."); |
| 153 | + listener.onResponse(empty); |
| 154 | + return; |
| 155 | + } |
| 156 | + StringBuilder sb = new StringBuilder( |
| 157 | + // Currently using c.value which is short header matching _cat/indices |
| 158 | + // May prefer to use c.attr.get("desc") for full description |
| 159 | + table.getHeaders().stream().map(c -> c.value.toString()).collect(Collectors.joining(",", "", "\n")) |
| 160 | + ); |
| 161 | + for (List<Cell> row : table.getRows()) { |
| 162 | + sb |
| 163 | + .append( |
| 164 | + row.stream().map(c -> c.value == null ? null : c.value.toString()).collect(Collectors.joining(",", "", "\n")) |
| 165 | + ); |
| 166 | + } |
| 167 | + @SuppressWarnings("unchecked") |
| 168 | + T response = (T) sb.toString(); |
| 169 | + listener.onResponse(response); |
| 170 | + }, listener::onFailure)); |
| 171 | + |
| 172 | + fetchClusterInfoAndPages( |
| 173 | + indices, |
| 174 | + local, |
| 175 | + includeUnloadedSegments, |
| 176 | + pageParams, |
| 177 | + indicesOptions, |
| 178 | + new ConcurrentLinkedQueue<>(), |
| 179 | + internalListener |
152 | 180 | );
|
153 |
| - for (List<Cell> row : table.getRows()) { |
154 |
| - sb.append(row.stream().map(c -> c.value == null ? null : c.value.toString()).collect(Collectors.joining(",", "", "\n"))); |
155 |
| - } |
156 |
| - @SuppressWarnings("unchecked") |
157 |
| - T response = (T) sb.toString(); |
158 |
| - listener.onResponse(response); |
159 |
| - }, listener::onFailure)); |
160 |
| - |
161 |
| - fetchClusterInfoAndPages( |
162 |
| - indices, |
163 |
| - local, |
164 |
| - includeUnloadedSegments, |
165 |
| - pageParams, |
166 |
| - indicesOptions, |
167 |
| - new ConcurrentLinkedQueue<>(), |
168 |
| - internalListener |
169 |
| - ); |
| 181 | + } catch (Exception e) { |
| 182 | + listener.onFailure(e); |
| 183 | + } |
170 | 184 | }
|
171 | 185 |
|
172 | 186 | private void fetchClusterInfoAndPages(
|
|
0 commit comments