Skip to content

Commit 2bdddb6

Browse files
joshuali925goyamegh
andcommitted
Add direct-query module for prometheus integration
Co-authored-by: Megha Goyal <[email protected]> Signed-off-by: Joshua Li <[email protected]>
1 parent a64eefe commit 2bdddb6

23 files changed

+2723
-3
lines changed

direct-query/build.gradle

+106
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
plugins {
7+
id 'java-library'
8+
id "io.freefair.lombok"
9+
id 'jacoco'
10+
}
11+
12+
repositories {
13+
mavenCentral()
14+
}
15+
16+
dependencies {
17+
api project(':core')
18+
api project(':direct-query-core')
19+
implementation project(':protocol')
20+
implementation project(':opensearch')
21+
implementation project(':datasources')
22+
implementation project(':direct-query-core')
23+
implementation project(':async-query-core')
24+
25+
// Common dependencies
26+
implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}"
27+
implementation group: 'org.json', name: 'json', version: '20231013'
28+
implementation group: 'commons-io', name: 'commons-io', version: "${commons_io_version}"
29+
30+
// Test dependencies
31+
testImplementation(platform("org.junit:junit-bom:5.9.3"))
32+
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.9.3'
33+
testImplementation group: 'org.mockito', name: 'mockito-core', version: "${mockito_version}"
34+
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: "${mockito_version}"
35+
36+
testCompileOnly('junit:junit:4.13.1') {
37+
exclude group: 'org.hamcrest', module: 'hamcrest-core'
38+
}
39+
testRuntimeOnly("org.junit.vintage:junit-vintage-engine") {
40+
exclude group: 'org.hamcrest', module: 'hamcrest-core'
41+
}
42+
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine") {
43+
exclude group: 'org.hamcrest', module: 'hamcrest-core'
44+
}
45+
testImplementation("org.opensearch.test:framework:${opensearch_version}")
46+
}
47+
48+
test {
49+
useJUnitPlatform()
50+
testLogging {
51+
events "failed"
52+
exceptionFormat "full"
53+
}
54+
}
55+
task junit4(type: Test) {
56+
useJUnitPlatform {
57+
includeEngines("junit-vintage")
58+
}
59+
systemProperty 'tests.security.manager', 'false'
60+
testLogging {
61+
events "failed"
62+
exceptionFormat "full"
63+
}
64+
}
65+
66+
jacocoTestReport {
67+
dependsOn test, junit4
68+
executionData test, junit4
69+
reports {
70+
html.required = true
71+
xml.required = true
72+
}
73+
afterEvaluate {
74+
classDirectories.setFrom(files(classDirectories.files.collect {
75+
fileTree(dir: it)
76+
}))
77+
}
78+
}
79+
80+
jacocoTestCoverageVerification {
81+
dependsOn test, junit4
82+
executionData test, junit4
83+
violationRules {
84+
rule {
85+
element = 'CLASS'
86+
excludes = [
87+
'org.opensearch.sql.directquery.transport.model.*'
88+
]
89+
limit {
90+
counter = 'LINE'
91+
minimum = 1.0
92+
}
93+
limit {
94+
counter = 'BRANCH'
95+
minimum = 1.0
96+
}
97+
}
98+
}
99+
afterEvaluate {
100+
classDirectories.setFrom(files(classDirectories.files.collect {
101+
fileTree(dir: it)
102+
}))
103+
}
104+
}
105+
check.dependsOn jacocoTestCoverageVerification
106+
jacocoTestCoverageVerification.dependsOn jacocoTestReport
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.directquery.rest;
7+
8+
import static org.opensearch.core.rest.RestStatus.BAD_REQUEST;
9+
import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR;
10+
import static org.opensearch.rest.RestRequest.Method.POST;
11+
12+
import com.google.common.collect.ImmutableList;
13+
import java.util.List;
14+
import java.util.Map;
15+
import java.util.Objects;
16+
import lombok.Getter;
17+
import lombok.RequiredArgsConstructor;
18+
import org.apache.logging.log4j.LogManager;
19+
import org.apache.logging.log4j.Logger;
20+
import org.opensearch.OpenSearchException;
21+
import org.opensearch.core.action.ActionListener;
22+
import org.opensearch.core.rest.RestStatus;
23+
import org.opensearch.rest.BaseRestHandler;
24+
import org.opensearch.rest.BytesRestResponse;
25+
import org.opensearch.rest.RestChannel;
26+
import org.opensearch.rest.RestRequest;
27+
import org.opensearch.sql.common.setting.Settings;
28+
import org.opensearch.sql.datasource.client.exceptions.DataSourceClientException;
29+
import org.opensearch.sql.datasources.exceptions.ErrorMessage;
30+
import org.opensearch.sql.datasources.utils.Scheduler;
31+
import org.opensearch.sql.directquery.rest.model.ExecuteDirectQueryRequest;
32+
import org.opensearch.sql.directquery.transport.TransportExecuteDirectQueryRequestAction;
33+
import org.opensearch.sql.directquery.transport.format.DirectQueryRequestConverter;
34+
import org.opensearch.sql.directquery.transport.model.ExecuteDirectQueryActionRequest;
35+
import org.opensearch.sql.directquery.transport.model.ExecuteDirectQueryActionResponse;
36+
import org.opensearch.sql.directquery.validator.DirectQueryRequestValidator;
37+
import org.opensearch.sql.opensearch.setting.OpenSearchSettings;
38+
import org.opensearch.sql.opensearch.util.RestRequestUtil;
39+
import org.opensearch.sql.protocol.response.format.JsonResponseFormatter;
40+
import org.opensearch.transport.client.node.NodeClient;
41+
42+
@RequiredArgsConstructor
43+
public class RestDirectQueryManagementAction extends BaseRestHandler {
44+
45+
public static final String DIRECT_QUERY_ACTIONS = "direct_query_actions";
46+
public static final String BASE_DIRECT_QUERY_ACTION_URL =
47+
"/_plugins/_directquery/_query/{dataSources}";
48+
49+
private static final Logger LOG = LogManager.getLogger(RestDirectQueryManagementAction.class);
50+
private final OpenSearchSettings settings;
51+
52+
@Override
53+
public String getName() {
54+
return DIRECT_QUERY_ACTIONS;
55+
}
56+
57+
@Override
58+
public List<Route> routes() {
59+
return ImmutableList.of(new Route(POST, BASE_DIRECT_QUERY_ACTION_URL));
60+
}
61+
62+
@Override
63+
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient nodeClient) {
64+
// This line consumes the dataSources parameter from the path
65+
String dataSources = restRequest.param("dataSources");
66+
67+
// Also consume all other request parameters to prevent similar errors
68+
RestRequestUtil.consumeAllRequestParameters(restRequest);
69+
70+
if (!dataSourcesEnabled()) {
71+
return dataSourcesDisabledError(restRequest);
72+
}
73+
74+
if (Objects.requireNonNull(restRequest.method()) == POST) {
75+
return executeDirectQueryRequest(restRequest, nodeClient, dataSources);
76+
}
77+
return restChannel ->
78+
restChannel.sendResponse(
79+
new BytesRestResponse(
80+
RestStatus.METHOD_NOT_ALLOWED, String.valueOf(restRequest.method())));
81+
}
82+
83+
private RestChannelConsumer executeDirectQueryRequest(
84+
RestRequest restRequest, NodeClient nodeClient, String dataSources) {
85+
return restChannel -> {
86+
try {
87+
ExecuteDirectQueryRequest directQueryRequest =
88+
DirectQueryRequestConverter.fromXContentParser(restRequest.contentParser());
89+
90+
// If the datasource is not specified in the payload, use the path parameter
91+
if (directQueryRequest.getDataSources() == null) {
92+
directQueryRequest.setDataSources(dataSources);
93+
}
94+
95+
// Generate a session ID if one is not provided in the request
96+
if (directQueryRequest.getSessionId() == null) {
97+
directQueryRequest.setSessionId(java.util.UUID.randomUUID().toString());
98+
}
99+
100+
// Validate request using the dedicated validator
101+
DirectQueryRequestValidator.validateRequest(directQueryRequest);
102+
103+
Scheduler.schedule(
104+
nodeClient,
105+
() ->
106+
nodeClient.execute(
107+
TransportExecuteDirectQueryRequestAction.ACTION_TYPE,
108+
new ExecuteDirectQueryActionRequest(directQueryRequest),
109+
new ActionListener<>() {
110+
@Override
111+
public void onResponse(ExecuteDirectQueryActionResponse response) {
112+
// Format the response here at the REST layer using JsonResponseFormatter
113+
String formattedResponse = formatDirectQueryResponse(response);
114+
restChannel.sendResponse(
115+
new BytesRestResponse(
116+
RestStatus.OK,
117+
"application/json; charset=UTF-8",
118+
formattedResponse));
119+
}
120+
121+
@Override
122+
public void onFailure(Exception e) {
123+
handleException(e, restChannel, restRequest.method());
124+
}
125+
}));
126+
} catch (Exception e) {
127+
handleException(e, restChannel, restRequest.method());
128+
}
129+
};
130+
}
131+
132+
/** Format the direct query response using JsonResponseFormatter */
133+
private String formatDirectQueryResponse(ExecuteDirectQueryActionResponse response) {
134+
try {
135+
// Create a formatter that converts the response to a pretty JSON format
136+
return new JsonResponseFormatter<ExecuteDirectQueryActionResponse>(
137+
JsonResponseFormatter.Style.PRETTY) {
138+
@Override
139+
protected Object buildJsonObject(ExecuteDirectQueryActionResponse response) {
140+
// Create a response object with the fields we want to expose
141+
return new DirectQueryResult(
142+
response.getQueryId(), response.getResults(), response.getSessionId());
143+
}
144+
}.format(response);
145+
} catch (Exception e) {
146+
LOG.error("Error formatting direct query response", e);
147+
return "{\"error\": \"" + e.getMessage() + "\"}";
148+
}
149+
}
150+
151+
/** Simple class to represent the formatted response */
152+
@Getter
153+
private static class DirectQueryResult {
154+
private final String queryId;
155+
private final Map<String, Object> results;
156+
private final String sessionId;
157+
158+
public DirectQueryResult(String queryId, Map<String, ?> results, String sessionId) {
159+
this.queryId = queryId;
160+
this.results = (Map<String, Object>) results;
161+
this.sessionId = sessionId;
162+
}
163+
}
164+
165+
private void handleException(
166+
Exception e, RestChannel restChannel, RestRequest.Method requestMethod) {
167+
if (e instanceof OpenSearchException) {
168+
OpenSearchException exception = (OpenSearchException) e;
169+
reportError(restChannel, exception, exception.status());
170+
} else {
171+
LOG.error("Error happened during request handling", e);
172+
if (isClientError(e)) {
173+
reportError(restChannel, e, BAD_REQUEST);
174+
} else {
175+
reportError(restChannel, e, INTERNAL_SERVER_ERROR);
176+
}
177+
}
178+
}
179+
180+
private void reportError(final RestChannel channel, final Exception e, final RestStatus status) {
181+
channel.sendResponse(
182+
new BytesRestResponse(status, new ErrorMessage(e, status.getStatus()).toString()));
183+
}
184+
185+
private static boolean isClientError(Exception e) {
186+
return e instanceof IllegalArgumentException
187+
|| e instanceof IllegalStateException
188+
|| e instanceof DataSourceClientException
189+
|| e instanceof IllegalAccessException;
190+
}
191+
192+
private boolean dataSourcesEnabled() {
193+
return settings.getSettingValue(Settings.Key.DATASOURCES_ENABLED);
194+
}
195+
196+
private RestChannelConsumer dataSourcesDisabledError(RestRequest request) {
197+
RestRequestUtil.consumeAllRequestParameters(request);
198+
199+
return channel -> {
200+
reportError(
201+
channel,
202+
new IllegalAccessException(
203+
String.format("%s setting is false", Settings.Key.DATASOURCES_ENABLED.getKeyValue())),
204+
BAD_REQUEST);
205+
};
206+
}
207+
}

0 commit comments

Comments
 (0)