Skip to content

Commit ed04dec

Browse files
[7.9][ML] Monitor reindex response in DF analytics (#60911) (#60967)
Examines the reindex response in order to report potential problems that occurred during the reindexing phase of data frame analytics jobs. Backport of #60911
1 parent 069ad78 commit ed04dec

File tree

2 files changed

+35
-0
lines changed

2 files changed

+35
-0
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
2020
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
2121
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
22+
import org.elasticsearch.action.bulk.BulkItemResponse;
2223
import org.elasticsearch.action.support.ContextPreservingActionListener;
2324
import org.elasticsearch.client.Client;
2425
import org.elasticsearch.client.ParentTaskAssigningClient;
@@ -214,6 +215,7 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
214215
// Reindexing is complete; start analytics
215216
ActionListener<BulkByScrollResponse> reindexCompletedListener = ActionListener.wrap(
216217
reindexResponse -> {
218+
217219
// If the reindex task is canceled, this listener is called.
218220
// Consequently, we should not signal reindex completion.
219221
if (task.isStopping()) {
@@ -222,7 +224,18 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
222224
task.markAsCompleted();
223225
return;
224226
}
227+
225228
task.setReindexingTaskId(null);
229+
230+
Exception reindexError = getReindexError(task.getParams().getId(), reindexResponse);
231+
if (reindexError != null) {
232+
task.markAsFailed(reindexError);
233+
return;
234+
}
235+
236+
LOGGER.debug("[{}] Reindex completed; created [{}]; retries [{}]", task.getParams().getId(),
237+
reindexResponse.getCreated(), reindexResponse.getBulkRetries());
238+
226239
auditor.info(
227240
config.getId(),
228241
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex(),
@@ -251,6 +264,7 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
251264
reindexRequest.setDestIndex(config.getDest().getIndex());
252265
reindexRequest.setScript(new Script("ctx._source." + DestinationIndex.ID_COPY + " = ctx._id"));
253266
reindexRequest.setParentTask(task.getParentTaskId());
267+
reindexRequest.getSearchRequest().allowPartialSearchResults(false);
254268

255269
final ThreadContext threadContext = parentTaskClient.threadPool().getThreadContext();
256270
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
@@ -295,6 +309,26 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
295309
new GetIndexRequest().indices(config.getDest().getIndex()), destIndexListener);
296310
}
297311

312+
private static Exception getReindexError(String jobId, BulkByScrollResponse reindexResponse) {
313+
if (reindexResponse.getBulkFailures().isEmpty() == false) {
314+
LOGGER.error("[{}] reindexing encountered {} failures", jobId,
315+
reindexResponse.getBulkFailures().size());
316+
for (BulkItemResponse.Failure failure : reindexResponse.getBulkFailures()) {
317+
LOGGER.error("[{}] reindexing failure: {}", jobId, failure);
318+
}
319+
return ExceptionsHelper.serverError("reindexing encountered " + reindexResponse.getBulkFailures().size() + " failures");
320+
}
321+
if (reindexResponse.getReasonCancelled() != null) {
322+
LOGGER.error("[{}] reindex task got cancelled with reason [{}]", jobId, reindexResponse.getReasonCancelled());
323+
return ExceptionsHelper.serverError("reindex task got cancelled with reason [" + reindexResponse.getReasonCancelled() + "]");
324+
}
325+
if (reindexResponse.isTimedOut()) {
326+
LOGGER.error("[{}] reindex task timed out after [{}]", jobId, reindexResponse.getTook().getStringRep());
327+
return ExceptionsHelper.serverError("reindex task timed out after [" + reindexResponse.getTook().getStringRep() + "]");
328+
}
329+
return null;
330+
}
331+
298332
private static boolean isTaskCancelledException(Exception error) {
299333
return ExceptionsHelper.unwrapCause(error) instanceof TaskCancelledException
300334
|| ExceptionsHelper.unwrapCause(error.getCause()) instanceof TaskCancelledException;

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ private SearchRequestBuilder buildDataSummarySearchRequestBuilder() {
282282
}
283283

284284
return new SearchRequestBuilder(client, SearchAction.INSTANCE)
285+
.setAllowPartialSearchResults(false)
285286
.setIndices(context.indices)
286287
.setSize(0)
287288
.setQuery(summaryQuery)

0 commit comments

Comments
 (0)