diff --git a/common/src/main/java/org/opensearch/sql/common/utils/LogUtils.java b/common/src/main/java/org/opensearch/sql/common/utils/QueryContext.java similarity index 78% rename from common/src/main/java/org/opensearch/sql/common/utils/LogUtils.java rename to common/src/main/java/org/opensearch/sql/common/utils/QueryContext.java index 2f8c22c059..372dbae387 100644 --- a/common/src/main/java/org/opensearch/sql/common/utils/LogUtils.java +++ b/common/src/main/java/org/opensearch/sql/common/utils/QueryContext.java @@ -7,13 +7,14 @@ package org.opensearch.sql.common.utils; import java.util.Map; +import java.util.Optional; import java.util.UUID; import org.apache.logging.log4j.ThreadContext; /** - * Utility class for generating/accessing the request id from logging context. + * Utility class for recording and accessing context for the query being executed. */ -public class LogUtils { +public class QueryContext { /** * The key of the request id in the context map. @@ -29,8 +30,10 @@ public class LogUtils { * call this method twice on the same thread within the lifetime of the request. *

*/ - public static void addRequestId() { - ThreadContext.put(REQUEST_ID_KEY, UUID.randomUUID().toString()); + public static String addRequestId() { + var id = UUID.randomUUID().toString(); + ThreadContext.put(REQUEST_ID_KEY, id); + return id; } /** @@ -38,8 +41,11 @@ public static void addRequestId() { * @return the current request id from {@link ThreadContext}. */ public static String getRequestId() { - final String requestId = ThreadContext.get(REQUEST_ID_KEY); - return requestId; + var id = ThreadContext.get(REQUEST_ID_KEY); + if (null == id) { + id = addRequestId(); + } + return id; } /** @@ -57,7 +63,7 @@ public static Runnable withCurrentContext(final Runnable task) { }; } - private LogUtils() { + private QueryContext() { throw new AssertionError( getClass().getCanonicalName() + " is a utility class and must not be initialized"); } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/AsyncRestExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/AsyncRestExecutor.java index e6406e8b3e..4ad6e55777 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/AsyncRestExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/AsyncRestExecutor.java @@ -19,13 +19,13 @@ import org.opensearch.rest.RestChannel; import org.opensearch.rest.RestStatus; import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.common.utils.QueryContext; import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.exception.SqlParseException; import org.opensearch.sql.legacy.metrics.MetricName; import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.legacy.query.QueryAction; import org.opensearch.sql.legacy.query.join.BackOffRetryStrategy; -import org.opensearch.sql.legacy.utils.LogUtils; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.Transports; @@ -73,13 +73,13 @@ public void execute(Client client, Map params, QueryAction query if (isBlockingAction(queryAction) && isRunningInTransportThread()) { if (LOG.isDebugEnabled()) { LOG.debug("[{}] Async blocking query action [{}] for executor [{}] in current thread [{}]", - LogUtils.getRequestId(), name(executor), name(queryAction), Thread.currentThread().getName()); + QueryContext.getRequestId(), name(executor), name(queryAction), Thread.currentThread().getName()); } async(client, params, queryAction, channel); } else { if (LOG.isDebugEnabled()) { LOG.debug("[{}] Continue running query action [{}] for executor [{}] in current thread [{}]", - LogUtils.getRequestId(), name(executor), name(queryAction), Thread.currentThread().getName()); + QueryContext.getRequestId(), name(executor), name(queryAction), Thread.currentThread().getName()); } doExecuteWithTimeMeasured(client, params, queryAction, channel); } @@ -110,18 +110,18 @@ private void async(Client client, Map params, QueryAction queryA doExecuteWithTimeMeasured(client, params, queryAction, channel); } catch (IOException | SqlParseException | OpenSearchException e) { Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); - LOG.warn("[{}] [MCB] async task got an IO/SQL exception: {}", LogUtils.getRequestId(), + LOG.warn("[{}] [MCB] async task got an IO/SQL exception: {}", QueryContext.getRequestId(), e.getMessage()); channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); } catch (IllegalStateException e) { Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); - LOG.warn("[{}] [MCB] async task got a runtime exception: {}", LogUtils.getRequestId(), + LOG.warn("[{}] [MCB] async task got a runtime exception: {}", QueryContext.getRequestId(), e.getMessage()); channel.sendResponse(new BytesRestResponse(RestStatus.INSUFFICIENT_STORAGE, "Memory circuit is broken.")); } catch (Throwable t) { Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); - LOG.warn("[{}] [MCB] async task got an unknown throwable: {}", LogUtils.getRequestId(), + LOG.warn("[{}] [MCB] async task got an unknown throwable: {}", QueryContext.getRequestId(), t.getMessage()); channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, String.valueOf(t.getMessage()))); @@ -132,7 +132,7 @@ private void async(Client client, Map params, QueryAction queryA // Preserve context of calling thread to ensure headers of requests are forwarded when running blocking actions threadPool.schedule( - LogUtils.withCurrentContext(runnable), + QueryContext.withCurrentContext(runnable), new TimeValue(0L), SQL_WORKER_THREAD_POOL_NAME ); @@ -152,7 +152,7 @@ private void doExecuteWithTimeMeasured(Client client, Duration elapsed = Duration.ofNanos(System.nanoTime() - startTime); int slowLogThreshold = LocalClusterState.state().getSettingValue(Settings.Key.SQL_SLOWLOG); if (elapsed.getSeconds() >= slowLogThreshold) { - LOG.warn("[{}] Slow query: elapsed={} (ms)", LogUtils.getRequestId(), elapsed.toMillis()); + LOG.warn("[{}] Slow query: elapsed={} (ms)", QueryContext.getRequestId(), elapsed.toMillis()); } } } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorAsyncRestExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorAsyncRestExecutor.java index 0dc1fe301f..7bb6421502 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorAsyncRestExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorAsyncRestExecutor.java @@ -17,11 +17,11 @@ import org.opensearch.rest.RestChannel; import org.opensearch.rest.RestStatus; import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.common.utils.QueryContext; import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.metrics.MetricName; import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.legacy.query.join.BackOffRetryStrategy; -import org.opensearch.sql.legacy.utils.LogUtils; import org.opensearch.threadpool.ThreadPool; public class CursorAsyncRestExecutor { @@ -57,20 +57,20 @@ private void async(Client client, Map params, RestChannel channe doExecuteWithTimeMeasured(client, params, channel); } catch (IOException e) { Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); - LOG.warn("[{}] [MCB] async task got an IO/SQL exception: {}", LogUtils.getRequestId(), + LOG.warn("[{}] [MCB] async task got an IO/SQL exception: {}", QueryContext.getRequestId(), e.getMessage()); e.printStackTrace(); channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); } catch (IllegalStateException e) { Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); - LOG.warn("[{}] [MCB] async task got a runtime exception: {}", LogUtils.getRequestId(), + LOG.warn("[{}] [MCB] async task got a runtime exception: {}", QueryContext.getRequestId(), e.getMessage()); e.printStackTrace(); channel.sendResponse(new BytesRestResponse(RestStatus.INSUFFICIENT_STORAGE, "Memory circuit is broken.")); } catch (Throwable t) { Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); - LOG.warn("[{}] [MCB] async task got an unknown throwable: {}", LogUtils.getRequestId(), + LOG.warn("[{}] [MCB] async task got an unknown throwable: {}", QueryContext.getRequestId(), t.getMessage()); t.printStackTrace(); channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, @@ -82,7 +82,7 @@ private void async(Client client, Map params, RestChannel channe // Preserve context of calling thread to ensure headers of requests are forwarded when running blocking actions threadPool.schedule( - LogUtils.withCurrentContext(runnable), + QueryContext.withCurrentContext(runnable), new TimeValue(0L), SQL_WORKER_THREAD_POOL_NAME ); @@ -101,7 +101,7 @@ private void doExecuteWithTimeMeasured(Client client, Duration elapsed = Duration.ofNanos(System.nanoTime() - startTime); int slowLogThreshold = LocalClusterState.state().getSettingValue(Settings.Key.SQL_SLOWLOG); if (elapsed.getSeconds() >= slowLogThreshold) { - LOG.warn("[{}] Slow query: elapsed={} (ms)", LogUtils.getRequestId(), elapsed.toMillis()); + LOG.warn("[{}] Slow query: elapsed={} (ms)", QueryContext.getRequestId(), elapsed.toMillis()); } } } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java index 10d9dab0fa..6f7579c9c7 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java @@ -35,6 +35,7 @@ import org.opensearch.rest.RestRequest; import org.opensearch.rest.RestStatus; import org.opensearch.sql.common.antlr.SyntaxCheckException; +import org.opensearch.sql.common.utils.QueryContext; import org.opensearch.sql.exception.ExpressionEvaluationException; import org.opensearch.sql.exception.SemanticCheckException; import org.opensearch.sql.legacy.antlr.OpenSearchLegacySqlAnalyzer; @@ -60,7 +61,6 @@ import org.opensearch.sql.legacy.request.SqlRequestParam; import org.opensearch.sql.legacy.rewriter.matchtoterm.VerificationException; import org.opensearch.sql.legacy.utils.JsonPrettyFormatter; -import org.opensearch.sql.legacy.utils.LogUtils; import org.opensearch.sql.legacy.utils.QueryDataAnonymizer; import org.opensearch.sql.sql.domain.SQLQueryRequest; @@ -123,7 +123,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli Metrics.getInstance().getNumericalMetric(MetricName.REQ_TOTAL).increment(); Metrics.getInstance().getNumericalMetric(MetricName.REQ_COUNT_TOTAL).increment(); - LogUtils.addRequestId(); + QueryContext.addRequestId(); try { if (!isSQLFeatureEnabled()) { @@ -137,12 +137,12 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli if (isExplainRequest(request)) { throw new IllegalArgumentException("Invalid request. Cannot explain cursor"); } else { - LOG.info("[{}] Cursor request {}: {}", LogUtils.getRequestId(), request.uri(), sqlRequest.cursor()); + LOG.info("[{}] Cursor request {}: {}", QueryContext.getRequestId(), request.uri(), sqlRequest.cursor()); return channel -> handleCursorRequest(request, sqlRequest.cursor(), client, channel); } } - LOG.info("[{}] Incoming request {}: {}", LogUtils.getRequestId(), request.uri(), + LOG.info("[{}] Incoming request {}: {}", QueryContext.getRequestId(), request.uri(), QueryDataAnonymizer.anonymizeData(sqlRequest.getSql())); Format format = SqlRequestParam.getFormat(request.params()); @@ -152,11 +152,11 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli sqlRequest.getSql(), request.path(), request.params()); RestChannelConsumer result = newSqlQueryHandler.prepareRequest(newSqlRequest, client); if (result != RestSQLQueryAction.NOT_SUPPORTED_YET) { - LOG.info("[{}] Request is handled by new SQL query engine", LogUtils.getRequestId()); + LOG.info("[{}] Request is handled by new SQL query engine", QueryContext.getRequestId()); return result; } LOG.debug("[{}] Request {} is not supported and falling back to old SQL engine", - LogUtils.getRequestId(), newSqlRequest); + QueryContext.getRequestId(), newSqlRequest); final QueryAction queryAction = explainRequest(client, sqlRequest, format); return channel -> executeSqlRequest(request, queryAction, client, channel); @@ -182,10 +182,10 @@ private void handleCursorRequest(final RestRequest request, final String cursor, private static void logAndPublishMetrics(final Exception e) { if (isClientError(e)) { - LOG.error(LogUtils.getRequestId() + " Client side error during query execution", e); + LOG.error(QueryContext.getRequestId() + " Client side error during query execution", e); Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_CUS).increment(); } else { - LOG.error(LogUtils.getRequestId() + " Server side error during query execution", e); + LOG.error(QueryContext.getRequestId() + " Server side error during query execution", e); Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); } } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlStatsAction.java b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlStatsAction.java index 70ec21c3fa..5b48ef6710 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlStatsAction.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlStatsAction.java @@ -22,9 +22,9 @@ import org.opensearch.rest.RestController; import org.opensearch.rest.RestRequest; import org.opensearch.rest.RestStatus; +import org.opensearch.sql.common.utils.QueryContext; import org.opensearch.sql.legacy.executor.format.ErrorMessageFactory; import org.opensearch.sql.legacy.metrics.Metrics; -import org.opensearch.sql.legacy.utils.LogUtils; /** * Currently this interface is for node level. @@ -67,7 +67,7 @@ public List replacedRoutes() { @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { - LogUtils.addRequestId(); + QueryContext.addRequestId(); try { return channel -> channel.sendResponse(new BytesRestResponse(RestStatus.OK, diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/utils/LogUtils.java b/legacy/src/main/java/org/opensearch/sql/legacy/utils/LogUtils.java deleted file mode 100644 index 4830dd4413..0000000000 --- a/legacy/src/main/java/org/opensearch/sql/legacy/utils/LogUtils.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - - -package org.opensearch.sql.legacy.utils; - -import java.util.Map; -import java.util.Optional; -import java.util.UUID; -import org.apache.logging.log4j.ThreadContext; - -/** - * Utility class for generating/accessing the request id from logging context. - */ -public class LogUtils { - - /** - * The key of the request id in the context map - */ - private static final String REQUEST_ID_KEY = "request_id"; - - private static final String EMPTY_ID = "ID"; - - /** - * Generates a random UUID and adds to the {@link ThreadContext} as the request id. - *

- * Note: If a request id already present, this method will overwrite it with a new - * one. This is to pre-vent re-using the same request id for different requests in - * case the same thread handles both of them. But this also means one should not - * call this method twice on the same thread within the lifetime of the request. - *

- */ - public static void addRequestId() { - - ThreadContext.put(REQUEST_ID_KEY, UUID.randomUUID().toString()); - } - - /** - * @return the current request id from {@link ThreadContext} - */ - public static String getRequestId() { - return Optional.ofNullable(ThreadContext.get(REQUEST_ID_KEY)).orElseGet(() -> EMPTY_ID); - } - - /** - * Wraps a given instance of {@link Runnable} into a new one which gets all the - * entries from current ThreadContext map. - * - * @param task the instance of Runnable to wrap - * @return the new task - */ - public static Runnable withCurrentContext(final Runnable task) { - - final Map currentContext = ThreadContext.getImmutableContext(); - return () -> { - ThreadContext.putAll(currentContext); - task.run(); - }; - } - - private LogUtils() { - - throw new AssertionError(getClass().getCanonicalName() + " is a utility class and must not be initialized"); - } -} diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/unittest/utils/LogUtilsTest.java b/legacy/src/test/java/org/opensearch/sql/legacy/unittest/utils/QueryContextTest.java similarity index 79% rename from legacy/src/test/java/org/opensearch/sql/legacy/unittest/utils/LogUtilsTest.java rename to legacy/src/test/java/org/opensearch/sql/legacy/unittest/utils/QueryContextTest.java index 564ce8c9ea..55b78af0d7 100644 --- a/legacy/src/test/java/org/opensearch/sql/legacy/unittest/utils/LogUtilsTest.java +++ b/legacy/src/test/java/org/opensearch/sql/legacy/unittest/utils/QueryContextTest.java @@ -8,15 +8,15 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import org.apache.logging.log4j.ThreadContext; import org.junit.After; import org.junit.Assert; import org.junit.Test; -import org.opensearch.sql.legacy.utils.LogUtils; +import org.opensearch.sql.common.utils.QueryContext; -public class LogUtilsTest { +public class QueryContextTest { private static final String REQUEST_ID_KEY = "request_id"; @@ -30,7 +30,7 @@ public void cleanUpContext() { public void addRequestId() { Assert.assertNull(ThreadContext.get(REQUEST_ID_KEY)); - LogUtils.addRequestId(); + QueryContext.addRequestId(); final String requestId = ThreadContext.get(REQUEST_ID_KEY); Assert.assertNotNull(requestId); } @@ -38,16 +38,16 @@ public void addRequestId() { @Test public void addRequestId_alreadyExists() { - LogUtils.addRequestId(); + QueryContext.addRequestId(); final String requestId = ThreadContext.get(REQUEST_ID_KEY); - LogUtils.addRequestId(); + QueryContext.addRequestId(); final String requestId2 = ThreadContext.get(REQUEST_ID_KEY); Assert.assertThat(requestId2, not(equalTo(requestId))); } @Test public void getRequestId_doesNotExist() { - assertEquals("ID", LogUtils.getRequestId()); + assertNotNull(QueryContext.getRequestId()); } @Test @@ -55,7 +55,7 @@ public void getRequestId() { final String test_request_id = "test_id_111"; ThreadContext.put(REQUEST_ID_KEY, test_request_id); - final String requestId = LogUtils.getRequestId(); + final String requestId = QueryContext.getRequestId(); Assert.assertThat(requestId, equalTo(test_request_id)); } @@ -68,6 +68,6 @@ public void withCurrentContext() throws InterruptedException { }; ThreadContext.put("test11", "value11"); ThreadContext.put("test22", "value11"); - new Thread(LogUtils.withCurrentContext(task)).join(); + new Thread(QueryContext.withCurrentContext(task)).join(); } } diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLStatsAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLStatsAction.java index 6b5cbd4135..5b9c792c7d 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLStatsAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLStatsAction.java @@ -22,9 +22,9 @@ import org.opensearch.rest.RestController; import org.opensearch.rest.RestRequest; import org.opensearch.rest.RestStatus; +import org.opensearch.sql.common.utils.QueryContext; import org.opensearch.sql.legacy.executor.format.ErrorMessageFactory; import org.opensearch.sql.legacy.metrics.Metrics; -import org.opensearch.sql.legacy.utils.LogUtils; /** * PPL Node level status. @@ -67,7 +67,7 @@ public List replacedRoutes() { @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { - LogUtils.addRequestId(); + QueryContext.addRequestId(); try { return channel -> channel.sendResponse(new BytesRestResponse(RestStatus.OK, diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestQuerySettingsAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestQuerySettingsAction.java index 0d8ca66cc6..14d06dfc71 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestQuerySettingsAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestQuerySettingsAction.java @@ -28,8 +28,8 @@ import org.opensearch.rest.RestController; import org.opensearch.rest.RestRequest; import org.opensearch.rest.action.RestToXContentListener; +import org.opensearch.sql.common.utils.QueryContext; import org.opensearch.sql.legacy.executor.format.ErrorMessageFactory; -import org.opensearch.sql.legacy.utils.LogUtils; public class RestQuerySettingsAction extends BaseRestHandler { private static final Logger LOG = LogManager.getLogger(RestQuerySettingsAction.class); @@ -74,7 +74,7 @@ public List replacedRoutes() { @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { - LogUtils.addRequestId(); + QueryContext.addRequestId(); final ClusterUpdateSettingsRequest clusterUpdateSettingsRequest = Requests.clusterUpdateSettingsRequest(); clusterUpdateSettingsRequest.timeout(request.paramAsTime( diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java index e4699b6f9f..31317c1962 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java @@ -20,7 +20,7 @@ import org.opensearch.common.inject.Inject; import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.common.setting.Settings; -import org.opensearch.sql.common.utils.LogUtils; +import org.opensearch.sql.common.utils.QueryContext; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.legacy.metrics.MetricName; import org.opensearch.sql.legacy.metrics.Metrics; @@ -77,7 +77,7 @@ protected void doExecute( Metrics.getInstance().getNumericalMetric(MetricName.PPL_REQ_TOTAL).increment(); Metrics.getInstance().getNumericalMetric(MetricName.PPL_REQ_COUNT_TOTAL).increment(); - LogUtils.addRequestId(); + QueryContext.addRequestId(); PPLService pplService = createPPLService(client); TransportPPLQueryRequest transportRequest = TransportPPLQueryRequest.fromActionRequest(request); diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java b/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java index a1a831c7cd..866326f562 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java @@ -16,7 +16,7 @@ import org.opensearch.sql.analysis.Analyzer; import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.common.response.ResponseListener; -import org.opensearch.sql.common.utils.LogUtils; +import org.opensearch.sql.common.utils.QueryContext; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.executor.ExecutionEngine.ExplainResponse; import org.opensearch.sql.expression.DSL; @@ -84,7 +84,8 @@ private PhysicalPlan plan(PPLQueryRequest request) { UnresolvedPlan ast = cst.accept( new AstBuilder(new AstExpressionBuilder(), request.getRequest())); - LOG.info("[{}] Incoming request {}", LogUtils.getRequestId(), anonymizer.anonymizeData(ast)); + LOG.info("[{}] Incoming request {}", QueryContext.getRequestId(), + anonymizer.anonymizeData(ast)); // 2.Analyze abstract syntax to generate logical plan LogicalPlan logicalPlan = analyzer.analyze(UnresolvedPlanHelper.addSelectAll(ast),