Skip to content

Commit dabe1d7

Browse files
Replace 2x LogUtils by QueryContext (#747) (#753)
Signed-off-by: Yury Fridlyand <[email protected]> (cherry picked from commit deececb) Co-authored-by: Yury-Fridlyand <[email protected]>
1 parent 7a4145e commit dabe1d7

File tree

11 files changed

+55
-115
lines changed

11 files changed

+55
-115
lines changed

common/src/main/java/org/opensearch/sql/common/utils/LogUtils.java renamed to common/src/main/java/org/opensearch/sql/common/utils/QueryContext.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@
77
package org.opensearch.sql.common.utils;
88

99
import java.util.Map;
10+
import java.util.Optional;
1011
import java.util.UUID;
1112
import org.apache.logging.log4j.ThreadContext;
1213

1314
/**
14-
* Utility class for generating/accessing the request id from logging context.
15+
* Utility class for recording and accessing context for the query being executed.
1516
*/
16-
public class LogUtils {
17+
public class QueryContext {
1718

1819
/**
1920
* The key of the request id in the context map.
@@ -29,17 +30,22 @@ public class LogUtils {
2930
* call this method twice on the same thread within the lifetime of the request.
3031
* </p>
3132
*/
32-
public static void addRequestId() {
33-
ThreadContext.put(REQUEST_ID_KEY, UUID.randomUUID().toString());
33+
public static String addRequestId() {
34+
var id = UUID.randomUUID().toString();
35+
ThreadContext.put(REQUEST_ID_KEY, id);
36+
return id;
3437
}
3538

3639
/**
3740
* Get RequestID.
3841
* @return the current request id from {@link ThreadContext}.
3942
*/
4043
public static String getRequestId() {
41-
final String requestId = ThreadContext.get(REQUEST_ID_KEY);
42-
return requestId;
44+
var id = ThreadContext.get(REQUEST_ID_KEY);
45+
if (null == id) {
46+
id = addRequestId();
47+
}
48+
return id;
4349
}
4450

4551
/**
@@ -57,7 +63,7 @@ public static Runnable withCurrentContext(final Runnable task) {
5763
};
5864
}
5965

60-
private LogUtils() {
66+
private QueryContext() {
6167
throw new AssertionError(
6268
getClass().getCanonicalName() + " is a utility class and must not be initialized");
6369
}

legacy/src/main/java/org/opensearch/sql/legacy/executor/AsyncRestExecutor.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@
1919
import org.opensearch.rest.RestChannel;
2020
import org.opensearch.rest.RestStatus;
2121
import org.opensearch.sql.common.setting.Settings;
22+
import org.opensearch.sql.common.utils.QueryContext;
2223
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
2324
import org.opensearch.sql.legacy.exception.SqlParseException;
2425
import org.opensearch.sql.legacy.metrics.MetricName;
2526
import org.opensearch.sql.legacy.metrics.Metrics;
2627
import org.opensearch.sql.legacy.query.QueryAction;
2728
import org.opensearch.sql.legacy.query.join.BackOffRetryStrategy;
28-
import org.opensearch.sql.legacy.utils.LogUtils;
2929
import org.opensearch.threadpool.ThreadPool;
3030
import org.opensearch.transport.Transports;
3131

@@ -73,13 +73,13 @@ public void execute(Client client, Map<String, String> params, QueryAction query
7373
if (isBlockingAction(queryAction) && isRunningInTransportThread()) {
7474
if (LOG.isDebugEnabled()) {
7575
LOG.debug("[{}] Async blocking query action [{}] for executor [{}] in current thread [{}]",
76-
LogUtils.getRequestId(), name(executor), name(queryAction), Thread.currentThread().getName());
76+
QueryContext.getRequestId(), name(executor), name(queryAction), Thread.currentThread().getName());
7777
}
7878
async(client, params, queryAction, channel);
7979
} else {
8080
if (LOG.isDebugEnabled()) {
8181
LOG.debug("[{}] Continue running query action [{}] for executor [{}] in current thread [{}]",
82-
LogUtils.getRequestId(), name(executor), name(queryAction), Thread.currentThread().getName());
82+
QueryContext.getRequestId(), name(executor), name(queryAction), Thread.currentThread().getName());
8383
}
8484
doExecuteWithTimeMeasured(client, params, queryAction, channel);
8585
}
@@ -110,18 +110,18 @@ private void async(Client client, Map<String, String> params, QueryAction queryA
110110
doExecuteWithTimeMeasured(client, params, queryAction, channel);
111111
} catch (IOException | SqlParseException | OpenSearchException e) {
112112
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
113-
LOG.warn("[{}] [MCB] async task got an IO/SQL exception: {}", LogUtils.getRequestId(),
113+
LOG.warn("[{}] [MCB] async task got an IO/SQL exception: {}", QueryContext.getRequestId(),
114114
e.getMessage());
115115
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
116116
} catch (IllegalStateException e) {
117117
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
118-
LOG.warn("[{}] [MCB] async task got a runtime exception: {}", LogUtils.getRequestId(),
118+
LOG.warn("[{}] [MCB] async task got a runtime exception: {}", QueryContext.getRequestId(),
119119
e.getMessage());
120120
channel.sendResponse(new BytesRestResponse(RestStatus.INSUFFICIENT_STORAGE,
121121
"Memory circuit is broken."));
122122
} catch (Throwable t) {
123123
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
124-
LOG.warn("[{}] [MCB] async task got an unknown throwable: {}", LogUtils.getRequestId(),
124+
LOG.warn("[{}] [MCB] async task got an unknown throwable: {}", QueryContext.getRequestId(),
125125
t.getMessage());
126126
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR,
127127
String.valueOf(t.getMessage())));
@@ -132,7 +132,7 @@ private void async(Client client, Map<String, String> params, QueryAction queryA
132132

133133
// Preserve context of calling thread to ensure headers of requests are forwarded when running blocking actions
134134
threadPool.schedule(
135-
LogUtils.withCurrentContext(runnable),
135+
QueryContext.withCurrentContext(runnable),
136136
new TimeValue(0L),
137137
SQL_WORKER_THREAD_POOL_NAME
138138
);
@@ -152,7 +152,7 @@ private void doExecuteWithTimeMeasured(Client client,
152152
Duration elapsed = Duration.ofNanos(System.nanoTime() - startTime);
153153
int slowLogThreshold = LocalClusterState.state().getSettingValue(Settings.Key.SQL_SLOWLOG);
154154
if (elapsed.getSeconds() >= slowLogThreshold) {
155-
LOG.warn("[{}] Slow query: elapsed={} (ms)", LogUtils.getRequestId(), elapsed.toMillis());
155+
LOG.warn("[{}] Slow query: elapsed={} (ms)", QueryContext.getRequestId(), elapsed.toMillis());
156156
}
157157
}
158158
}

legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorAsyncRestExecutor.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717
import org.opensearch.rest.RestChannel;
1818
import org.opensearch.rest.RestStatus;
1919
import org.opensearch.sql.common.setting.Settings;
20+
import org.opensearch.sql.common.utils.QueryContext;
2021
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
2122
import org.opensearch.sql.legacy.metrics.MetricName;
2223
import org.opensearch.sql.legacy.metrics.Metrics;
2324
import org.opensearch.sql.legacy.query.join.BackOffRetryStrategy;
24-
import org.opensearch.sql.legacy.utils.LogUtils;
2525
import org.opensearch.threadpool.ThreadPool;
2626

2727
public class CursorAsyncRestExecutor {
@@ -57,20 +57,20 @@ private void async(Client client, Map<String, String> params, RestChannel channe
5757
doExecuteWithTimeMeasured(client, params, channel);
5858
} catch (IOException e) {
5959
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
60-
LOG.warn("[{}] [MCB] async task got an IO/SQL exception: {}", LogUtils.getRequestId(),
60+
LOG.warn("[{}] [MCB] async task got an IO/SQL exception: {}", QueryContext.getRequestId(),
6161
e.getMessage());
6262
e.printStackTrace();
6363
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
6464
} catch (IllegalStateException e) {
6565
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
66-
LOG.warn("[{}] [MCB] async task got a runtime exception: {}", LogUtils.getRequestId(),
66+
LOG.warn("[{}] [MCB] async task got a runtime exception: {}", QueryContext.getRequestId(),
6767
e.getMessage());
6868
e.printStackTrace();
6969
channel.sendResponse(new BytesRestResponse(RestStatus.INSUFFICIENT_STORAGE,
7070
"Memory circuit is broken."));
7171
} catch (Throwable t) {
7272
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
73-
LOG.warn("[{}] [MCB] async task got an unknown throwable: {}", LogUtils.getRequestId(),
73+
LOG.warn("[{}] [MCB] async task got an unknown throwable: {}", QueryContext.getRequestId(),
7474
t.getMessage());
7575
t.printStackTrace();
7676
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR,
@@ -82,7 +82,7 @@ private void async(Client client, Map<String, String> params, RestChannel channe
8282

8383
// Preserve context of calling thread to ensure headers of requests are forwarded when running blocking actions
8484
threadPool.schedule(
85-
LogUtils.withCurrentContext(runnable),
85+
QueryContext.withCurrentContext(runnable),
8686
new TimeValue(0L),
8787
SQL_WORKER_THREAD_POOL_NAME
8888
);
@@ -101,7 +101,7 @@ private void doExecuteWithTimeMeasured(Client client,
101101
Duration elapsed = Duration.ofNanos(System.nanoTime() - startTime);
102102
int slowLogThreshold = LocalClusterState.state().getSettingValue(Settings.Key.SQL_SLOWLOG);
103103
if (elapsed.getSeconds() >= slowLogThreshold) {
104-
LOG.warn("[{}] Slow query: elapsed={} (ms)", LogUtils.getRequestId(), elapsed.toMillis());
104+
LOG.warn("[{}] Slow query: elapsed={} (ms)", QueryContext.getRequestId(), elapsed.toMillis());
105105
}
106106
}
107107
}

legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.opensearch.rest.RestRequest;
3636
import org.opensearch.rest.RestStatus;
3737
import org.opensearch.sql.common.antlr.SyntaxCheckException;
38+
import org.opensearch.sql.common.utils.QueryContext;
3839
import org.opensearch.sql.exception.ExpressionEvaluationException;
3940
import org.opensearch.sql.exception.SemanticCheckException;
4041
import org.opensearch.sql.legacy.antlr.OpenSearchLegacySqlAnalyzer;
@@ -60,7 +61,6 @@
6061
import org.opensearch.sql.legacy.request.SqlRequestParam;
6162
import org.opensearch.sql.legacy.rewriter.matchtoterm.VerificationException;
6263
import org.opensearch.sql.legacy.utils.JsonPrettyFormatter;
63-
import org.opensearch.sql.legacy.utils.LogUtils;
6464
import org.opensearch.sql.legacy.utils.QueryDataAnonymizer;
6565
import org.opensearch.sql.sql.domain.SQLQueryRequest;
6666

@@ -123,7 +123,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
123123
Metrics.getInstance().getNumericalMetric(MetricName.REQ_TOTAL).increment();
124124
Metrics.getInstance().getNumericalMetric(MetricName.REQ_COUNT_TOTAL).increment();
125125

126-
LogUtils.addRequestId();
126+
QueryContext.addRequestId();
127127

128128
try {
129129
if (!isSQLFeatureEnabled()) {
@@ -137,12 +137,12 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
137137
if (isExplainRequest(request)) {
138138
throw new IllegalArgumentException("Invalid request. Cannot explain cursor");
139139
} else {
140-
LOG.info("[{}] Cursor request {}: {}", LogUtils.getRequestId(), request.uri(), sqlRequest.cursor());
140+
LOG.info("[{}] Cursor request {}: {}", QueryContext.getRequestId(), request.uri(), sqlRequest.cursor());
141141
return channel -> handleCursorRequest(request, sqlRequest.cursor(), client, channel);
142142
}
143143
}
144144

145-
LOG.info("[{}] Incoming request {}: {}", LogUtils.getRequestId(), request.uri(),
145+
LOG.info("[{}] Incoming request {}: {}", QueryContext.getRequestId(), request.uri(),
146146
QueryDataAnonymizer.anonymizeData(sqlRequest.getSql()));
147147

148148
Format format = SqlRequestParam.getFormat(request.params());
@@ -152,11 +152,11 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
152152
sqlRequest.getSql(), request.path(), request.params());
153153
RestChannelConsumer result = newSqlQueryHandler.prepareRequest(newSqlRequest, client);
154154
if (result != RestSQLQueryAction.NOT_SUPPORTED_YET) {
155-
LOG.info("[{}] Request is handled by new SQL query engine", LogUtils.getRequestId());
155+
LOG.info("[{}] Request is handled by new SQL query engine", QueryContext.getRequestId());
156156
return result;
157157
}
158158
LOG.debug("[{}] Request {} is not supported and falling back to old SQL engine",
159-
LogUtils.getRequestId(), newSqlRequest);
159+
QueryContext.getRequestId(), newSqlRequest);
160160

161161
final QueryAction queryAction = explainRequest(client, sqlRequest, format);
162162
return channel -> executeSqlRequest(request, queryAction, client, channel);
@@ -182,10 +182,10 @@ private void handleCursorRequest(final RestRequest request, final String cursor,
182182

183183
private static void logAndPublishMetrics(final Exception e) {
184184
if (isClientError(e)) {
185-
LOG.error(LogUtils.getRequestId() + " Client side error during query execution", e);
185+
LOG.error(QueryContext.getRequestId() + " Client side error during query execution", e);
186186
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_CUS).increment();
187187
} else {
188-
LOG.error(LogUtils.getRequestId() + " Server side error during query execution", e);
188+
LOG.error(QueryContext.getRequestId() + " Server side error during query execution", e);
189189
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
190190
}
191191
}

legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlStatsAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@
2222
import org.opensearch.rest.RestController;
2323
import org.opensearch.rest.RestRequest;
2424
import org.opensearch.rest.RestStatus;
25+
import org.opensearch.sql.common.utils.QueryContext;
2526
import org.opensearch.sql.legacy.executor.format.ErrorMessageFactory;
2627
import org.opensearch.sql.legacy.metrics.Metrics;
27-
import org.opensearch.sql.legacy.utils.LogUtils;
2828

2929
/**
3030
* Currently this interface is for node level.
@@ -67,7 +67,7 @@ public List<ReplacedRoute> replacedRoutes() {
6767
@Override
6868
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
6969

70-
LogUtils.addRequestId();
70+
QueryContext.addRequestId();
7171

7272
try {
7373
return channel -> channel.sendResponse(new BytesRestResponse(RestStatus.OK,

legacy/src/main/java/org/opensearch/sql/legacy/utils/LogUtils.java

Lines changed: 0 additions & 67 deletions
This file was deleted.

legacy/src/test/java/org/opensearch/sql/legacy/unittest/utils/LogUtilsTest.java renamed to legacy/src/test/java/org/opensearch/sql/legacy/unittest/utils/QueryContextTest.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@
88

99
import static org.hamcrest.Matchers.equalTo;
1010
import static org.hamcrest.Matchers.not;
11-
import static org.junit.Assert.assertEquals;
11+
import static org.junit.Assert.assertNotNull;
1212

1313
import org.apache.logging.log4j.ThreadContext;
1414
import org.junit.After;
1515
import org.junit.Assert;
1616
import org.junit.Test;
17-
import org.opensearch.sql.legacy.utils.LogUtils;
17+
import org.opensearch.sql.common.utils.QueryContext;
1818

19-
public class LogUtilsTest {
19+
public class QueryContextTest {
2020

2121
private static final String REQUEST_ID_KEY = "request_id";
2222

@@ -30,32 +30,32 @@ public void cleanUpContext() {
3030
public void addRequestId() {
3131

3232
Assert.assertNull(ThreadContext.get(REQUEST_ID_KEY));
33-
LogUtils.addRequestId();
33+
QueryContext.addRequestId();
3434
final String requestId = ThreadContext.get(REQUEST_ID_KEY);
3535
Assert.assertNotNull(requestId);
3636
}
3737

3838
@Test
3939
public void addRequestId_alreadyExists() {
4040

41-
LogUtils.addRequestId();
41+
QueryContext.addRequestId();
4242
final String requestId = ThreadContext.get(REQUEST_ID_KEY);
43-
LogUtils.addRequestId();
43+
QueryContext.addRequestId();
4444
final String requestId2 = ThreadContext.get(REQUEST_ID_KEY);
4545
Assert.assertThat(requestId2, not(equalTo(requestId)));
4646
}
4747

4848
@Test
4949
public void getRequestId_doesNotExist() {
50-
assertEquals("ID", LogUtils.getRequestId());
50+
assertNotNull(QueryContext.getRequestId());
5151
}
5252

5353
@Test
5454
public void getRequestId() {
5555

5656
final String test_request_id = "test_id_111";
5757
ThreadContext.put(REQUEST_ID_KEY, test_request_id);
58-
final String requestId = LogUtils.getRequestId();
58+
final String requestId = QueryContext.getRequestId();
5959
Assert.assertThat(requestId, equalTo(test_request_id));
6060
}
6161

@@ -68,6 +68,6 @@ public void withCurrentContext() throws InterruptedException {
6868
};
6969
ThreadContext.put("test11", "value11");
7070
ThreadContext.put("test22", "value11");
71-
new Thread(LogUtils.withCurrentContext(task)).join();
71+
new Thread(QueryContext.withCurrentContext(task)).join();
7272
}
7373
}

0 commit comments

Comments
 (0)