Skip to content

Commit 6d964d0

Browse files
[7.x][ML] Fix race condition when force stopping DF analytics job (#57680) (#58031)
When we force delete a DF analytics job, we currently first force stop it and then we proceed with deleting the job config. This may result in logging errors if the job config is deleted before it is retrieved while the job is starting. Instead of force stopping the job, it would make more sense to try to stop the job gracefully first. So we now try that out first. If normal stop fails, then we resort to force stopping the job to ensure we can go through with the delete. In addition, this commit introduces `timeout` for the delete action and makes use of it in the child requests. Backport of #57680
1 parent 3350028 commit 6d964d0

File tree

11 files changed

+114
-22
lines changed

11 files changed

+114
-22
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -712,6 +712,9 @@ static Request deleteDataFrameAnalytics(DeleteDataFrameAnalyticsRequest deleteRe
712712
if (deleteRequest.getForce() != null) {
713713
params.putParam("force", Boolean.toString(deleteRequest.getForce()));
714714
}
715+
if (deleteRequest.getTimeout() != null) {
716+
params.withTimeout(deleteRequest.getTimeout());
717+
}
715718
request.addParameters(params.asMap());
716719

717720
return request;

client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteDataFrameAnalyticsRequest.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.elasticsearch.client.Validatable;
2323
import org.elasticsearch.client.ValidationException;
24+
import org.elasticsearch.common.unit.TimeValue;
2425

2526
import java.util.Objects;
2627
import java.util.Optional;
@@ -32,6 +33,7 @@ public class DeleteDataFrameAnalyticsRequest implements Validatable {
3233

3334
private final String id;
3435
private Boolean force;
36+
private TimeValue timeout;
3537

3638
public DeleteDataFrameAnalyticsRequest(String id) {
3739
this.id = id;
@@ -55,6 +57,19 @@ public void setForce(Boolean force) {
5557
this.force = force;
5658
}
5759

60+
public TimeValue getTimeout() {
61+
return timeout;
62+
}
63+
64+
/**
65+
* Sets the time to wait until the job is deleted.
66+
*
67+
* @param timeout The time to wait until the job is deleted.
68+
*/
69+
public void setTimeout(TimeValue timeout) {
70+
this.timeout = timeout;
71+
}
72+
5873
@Override
5974
public Optional<ValidationException> validate() {
6075
if (id == null) {
@@ -69,11 +84,13 @@ public boolean equals(Object o) {
6984
if (o == null || getClass() != o.getClass()) return false;
7085

7186
DeleteDataFrameAnalyticsRequest other = (DeleteDataFrameAnalyticsRequest) o;
72-
return Objects.equals(id, other.id) && Objects.equals(force, other.force);
87+
return Objects.equals(id, other.id)
88+
&& Objects.equals(force, other.force)
89+
&& Objects.equals(timeout, other.timeout);
7390
}
7491

7592
@Override
7693
public int hashCode() {
77-
return Objects.hash(id, force);
94+
return Objects.hash(id, force, timeout);
7895
}
7996
}

client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -816,9 +816,21 @@ public void testDeleteDataFrameAnalytics() {
816816
assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
817817
assertEquals("/_ml/data_frame/analytics/" + deleteRequest.getId(), request.getEndpoint());
818818
assertNull(request.getEntity());
819+
assertThat(request.getParameters().size(), equalTo(1));
819820
assertEquals(Boolean.toString(true), request.getParameters().get("force"));
820821
}
821822

823+
public void testDeleteDataFrameAnalytics_WithTimeout() {
824+
DeleteDataFrameAnalyticsRequest deleteRequest = new DeleteDataFrameAnalyticsRequest(randomAlphaOfLength(10));
825+
deleteRequest.setTimeout(TimeValue.timeValueSeconds(10));
826+
Request request = MLRequestConverters.deleteDataFrameAnalytics(deleteRequest);
827+
assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
828+
assertEquals("/_ml/data_frame/analytics/" + deleteRequest.getId(), request.getEndpoint());
829+
assertNull(request.getEntity());
830+
assertThat(request.getParameters().size(), equalTo(1));
831+
assertEquals(request.getParameters().get("timeout"), "10s");
832+
}
833+
822834
public void testEvaluateDataFrame() throws IOException {
823835
EvaluateDataFrameRequest evaluateRequest = EvaluateDataFrameRequestTests.createRandom();
824836
Request request = MLRequestConverters.evaluateDataFrame(evaluateRequest);

client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3089,9 +3089,10 @@ public void testDeleteDataFrameAnalytics() throws Exception {
30893089
DeleteDataFrameAnalyticsRequest request = new DeleteDataFrameAnalyticsRequest("my-analytics-config"); // <1>
30903090
// end::delete-data-frame-analytics-request
30913091

3092-
//tag::delete-data-frame-analytics-request-force
3092+
//tag::delete-data-frame-analytics-request-options
30933093
request.setForce(false); // <1>
3094-
//end::delete-data-frame-analytics-request-force
3094+
request.setTimeout(TimeValue.timeValueMinutes(1)); // <2>
3095+
//end::delete-data-frame-analytics-request-options
30953096

30963097
// tag::delete-data-frame-analytics-execute
30973098
AcknowledgedResponse response = client.machineLearning().deleteDataFrameAnalytics(request, RequestOptions.DEFAULT);

docs/java-rest/high-level/ml/delete-data-frame-analytics.asciidoc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,11 @@ The following arguments are optional:
2727

2828
["source","java",subs="attributes,callouts,macros"]
2929
---------------------------------------------------
30-
include-tagged::{doc-tests-file}[{api}-request-force]
30+
include-tagged::{doc-tests-file}[{api}-request-options]
3131
---------------------------------------------------
3232
<1> Use to forcefully delete a job that is not stopped. This method is quicker than stopping
3333
and deleting the job. Defaults to `false`.
34+
<2> Use to set the time to wait until the job is deleted. Defaults to 1 minute.
3435

3536
include::../execution.asciidoc[]
3637

docs/reference/ml/df-analytics/apis/delete-dfanalytics.asciidoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=job-id-data-frame-analytics]
4343
(Optional, boolean) If `true`, it deletes a job that is not stopped; this method is
4444
quicker than stopping and deleting the job.
4545

46+
`timeout`::
47+
(Optional, <<time-units,time units>>) The time to wait for the job to be deleted.
48+
Defaults to 1 minute.
49+
50+
4651

4752
[[ml-delete-dfanalytics-example]]
4853
==== {api-examples-title}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteDataFrameAnalyticsAction.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515
import org.elasticsearch.common.ParseField;
1616
import org.elasticsearch.common.io.stream.StreamInput;
1717
import org.elasticsearch.common.io.stream.StreamOutput;
18+
import org.elasticsearch.common.unit.TimeValue;
1819
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
1920
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
2021

2122
import java.io.IOException;
2223
import java.util.Objects;
24+
import java.util.concurrent.TimeUnit;
2325

2426
public class DeleteDataFrameAnalyticsAction extends ActionType<AcknowledgedResponse> {
2527

@@ -33,6 +35,10 @@ private DeleteDataFrameAnalyticsAction() {
3335
public static class Request extends AcknowledgedRequest<Request> {
3436

3537
public static final ParseField FORCE = new ParseField("force");
38+
public static final ParseField TIMEOUT = new ParseField("timeout");
39+
40+
// Default timeout matches that of delete by query
41+
private static final TimeValue DEFAULT_TIMEOUT = new TimeValue(1, TimeUnit.MINUTES);
3642

3743
private String id;
3844
private boolean force;
@@ -47,9 +53,12 @@ public Request(StreamInput in) throws IOException {
4753
}
4854
}
4955

50-
public Request() {}
56+
public Request() {
57+
timeout(DEFAULT_TIMEOUT);
58+
}
5159

5260
public Request(String id) {
61+
this();
5362
this.id = ExceptionsHelper.requireNonNull(id, DataFrameAnalyticsConfig.ID);
5463
}
5564

@@ -75,7 +84,9 @@ public boolean equals(Object o) {
7584
if (this == o) return true;
7685
if (o == null || getClass() != o.getClass()) return false;
7786
DeleteDataFrameAnalyticsAction.Request request = (DeleteDataFrameAnalyticsAction.Request) o;
78-
return Objects.equals(id, request.id) && force == request.force;
87+
return Objects.equals(id, request.id)
88+
&& force == request.force
89+
&& Objects.equals(timeout, request.timeout);
7990
}
8091

8192
@Override
@@ -89,7 +100,7 @@ public void writeTo(StreamOutput out) throws IOException {
89100

90101
@Override
91102
public int hashCode() {
92-
return Objects.hash(id, force);
103+
return Objects.hash(id, force, timeout);
93104
}
94105
}
95106

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/DeleteDataFrameAnalyticsActionRequestTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.elasticsearch.xpack.core.ml.action;
77

88
import org.elasticsearch.common.io.stream.Writeable;
9+
import org.elasticsearch.common.unit.TimeValue;
910
import org.elasticsearch.test.AbstractWireSerializingTestCase;
1011
import org.elasticsearch.xpack.core.ml.action.DeleteDataFrameAnalyticsAction.Request;
1112

@@ -18,6 +19,9 @@ public class DeleteDataFrameAnalyticsActionRequestTests extends AbstractWireSeri
1819
protected Request createTestInstance() {
1920
Request request = new Request(randomAlphaOfLength(10));
2021
request.setForce(randomBoolean());
22+
if (randomBoolean()) {
23+
request.timeout(TimeValue.parseTimeValue(randomTimeValue(), "test"));
24+
}
2125
return request;
2226
}
2327

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDataFrameAnalyticsAction.java

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import org.apache.logging.log4j.LogManager;
99
import org.apache.logging.log4j.Logger;
10+
import org.apache.logging.log4j.message.ParameterizedMessage;
1011
import org.elasticsearch.action.ActionListener;
1112
import org.elasticsearch.action.DocWriteResponse;
1213
import org.elasticsearch.action.bulk.BulkItemResponse;
@@ -26,6 +27,7 @@
2627
import org.elasticsearch.cluster.service.ClusterService;
2728
import org.elasticsearch.common.inject.Inject;
2829
import org.elasticsearch.common.io.stream.StreamInput;
30+
import org.elasticsearch.common.unit.TimeValue;
2931
import org.elasticsearch.index.query.QueryBuilder;
3032
import org.elasticsearch.index.query.QueryBuilders;
3133
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
@@ -109,33 +111,59 @@ protected void masterOperation(DeleteDataFrameAnalyticsAction.Request request, C
109111
@Override
110112
protected void masterOperation(Task task, DeleteDataFrameAnalyticsAction.Request request, ClusterState state,
111113
ActionListener<AcknowledgedResponse> listener) {
112-
String id = request.getId();
113114
TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
114115
ParentTaskAssigningClient parentTaskClient = new ParentTaskAssigningClient(client, taskId);
115116

116117
if (request.isForce()) {
117-
forceDelete(parentTaskClient, id, listener);
118+
forceDelete(parentTaskClient, request, listener);
118119
} else {
119-
normalDelete(parentTaskClient, state, id, listener);
120+
normalDelete(parentTaskClient, state, request, listener);
120121
}
121122
}
122123

123-
private void forceDelete(ParentTaskAssigningClient parentTaskClient, String id,
124+
private void forceDelete(ParentTaskAssigningClient parentTaskClient, DeleteDataFrameAnalyticsAction.Request request,
124125
ActionListener<AcknowledgedResponse> listener) {
125-
logger.debug("[{}] Force deleting data frame analytics job", id);
126+
logger.debug("[{}] Force deleting data frame analytics job", request.getId());
126127

127128
ActionListener<StopDataFrameAnalyticsAction.Response> stopListener = ActionListener.wrap(
128-
stopResponse -> normalDelete(parentTaskClient, clusterService.state(), id, listener),
129+
stopResponse -> normalDelete(parentTaskClient, clusterService.state(), request, listener),
129130
listener::onFailure
130131
);
131132

132-
StopDataFrameAnalyticsAction.Request request = new StopDataFrameAnalyticsAction.Request(id);
133-
request.setForce(true);
134-
executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, StopDataFrameAnalyticsAction.INSTANCE, request, stopListener);
133+
stopJob(parentTaskClient, request, stopListener);
135134
}
136135

137-
private void normalDelete(ParentTaskAssigningClient parentTaskClient, ClusterState state, String id,
138-
ActionListener<AcknowledgedResponse> listener) {
136+
private void stopJob(ParentTaskAssigningClient parentTaskClient, DeleteDataFrameAnalyticsAction.Request request,
137+
ActionListener<StopDataFrameAnalyticsAction.Response> listener) {
138+
// We first try to stop the job normally. Normal stop returns after the job was stopped.
139+
// If that fails then we proceed to force stopping which returns as soon as the persistent task is removed.
140+
// If we just did force stopping, then there is a chance we proceed to delete the config while it's
141+
// still used from the running task which results in logging errors.
142+
143+
StopDataFrameAnalyticsAction.Request stopRequest = new StopDataFrameAnalyticsAction.Request(request.getId());
144+
stopRequest.setTimeout(request.timeout());
145+
146+
ActionListener<StopDataFrameAnalyticsAction.Response> normalStopListener = ActionListener.wrap(
147+
listener::onResponse,
148+
normalStopFailure -> {
149+
stopRequest.setForce(true);
150+
executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, StopDataFrameAnalyticsAction.INSTANCE, stopRequest, ActionListener.wrap(
151+
listener::onResponse,
152+
forceStopFailure -> {
153+
logger.error(new ParameterizedMessage("[{}] Failed to stop normally", request.getId()), normalStopFailure);
154+
logger.error(new ParameterizedMessage("[{}] Failed to stop forcefully", request.getId()), forceStopFailure);
155+
listener.onFailure(forceStopFailure);
156+
}
157+
));
158+
}
159+
);
160+
161+
executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, StopDataFrameAnalyticsAction.INSTANCE, stopRequest, normalStopListener);
162+
}
163+
164+
private void normalDelete(ParentTaskAssigningClient parentTaskClient, ClusterState state,
165+
DeleteDataFrameAnalyticsAction.Request request, ActionListener<AcknowledgedResponse> listener) {
166+
String id = request.getId();
139167
PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
140168
DataFrameAnalyticsState taskState = MlTasks.getDataFrameAnalyticsState(id, tasks);
141169
if (taskState != DataFrameAnalyticsState.STOPPED) {
@@ -178,14 +206,14 @@ private void normalDelete(ParentTaskAssigningClient parentTaskClient, ClusterSta
178206
logger.warn("[{}] DBQ failure: {}", id, failure);
179207
}
180208
}
181-
deleteStats(parentTaskClient, id, deleteStatsHandler);
209+
deleteStats(parentTaskClient, id, request.timeout(), deleteStatsHandler);
182210
},
183211
listener::onFailure
184212
);
185213

186214
// Step 2. Delete state
187215
ActionListener<DataFrameAnalyticsConfig> configListener = ActionListener.wrap(
188-
config -> deleteState(parentTaskClient, config, deleteStateHandler),
216+
config -> deleteState(parentTaskClient, config, request.timeout(), deleteStateHandler),
189217
listener::onFailure
190218
);
191219

@@ -214,6 +242,7 @@ private void deleteConfig(ParentTaskAssigningClient parentTaskClient, String id,
214242

215243
private void deleteState(ParentTaskAssigningClient parentTaskClient,
216244
DataFrameAnalyticsConfig config,
245+
TimeValue timeout,
217246
ActionListener<BulkByScrollResponse> listener) {
218247
List<String> ids = new ArrayList<>();
219248
ids.add(StoredProgress.documentId(config.getId()));
@@ -224,29 +253,33 @@ private void deleteState(ParentTaskAssigningClient parentTaskClient,
224253
parentTaskClient,
225254
AnomalyDetectorsIndex.jobStateIndexPattern(),
226255
QueryBuilders.idsQuery().addIds(ids.toArray(new String[0])),
256+
timeout,
227257
listener
228258
);
229259
}
230260

231261
private void deleteStats(ParentTaskAssigningClient parentTaskClient,
232262
String jobId,
263+
TimeValue timeout,
233264
ActionListener<BulkByScrollResponse> listener) {
234265
executeDeleteByQuery(
235266
parentTaskClient,
236267
MlStatsIndex.indexPattern(),
237268
QueryBuilders.termQuery(Fields.JOB_ID.getPreferredName(), jobId),
269+
timeout,
238270
listener
239271
);
240272
}
241273

242-
private void executeDeleteByQuery(ParentTaskAssigningClient parentTaskClient, String index, QueryBuilder query,
274+
private void executeDeleteByQuery(ParentTaskAssigningClient parentTaskClient, String index, QueryBuilder query, TimeValue timeout,
243275
ActionListener<BulkByScrollResponse> listener) {
244276
DeleteByQueryRequest request = new DeleteByQueryRequest(index);
245277
request.setQuery(query);
246278
request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()));
247279
request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
248280
request.setAbortOnVersionConflict(false);
249281
request.setRefresh(true);
282+
request.setTimeout(timeout);
250283
executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, listener);
251284
}
252285

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestDeleteDataFrameAnalyticsAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
3737
String id = restRequest.param(DataFrameAnalyticsConfig.ID.getPreferredName());
3838
DeleteDataFrameAnalyticsAction.Request request = new DeleteDataFrameAnalyticsAction.Request(id);
3939
request.setForce(restRequest.paramAsBoolean(DeleteDataFrameAnalyticsAction.Request.FORCE.getPreferredName(), request.isForce()));
40+
request.timeout(restRequest.paramAsTime(DeleteDataFrameAnalyticsAction.Request.TIMEOUT.getPreferredName(), request.timeout()));
4041
return channel -> client.execute(DeleteDataFrameAnalyticsAction.INSTANCE, request, new RestToXContentListener<>(channel));
4142
}
4243
}

x-pack/plugin/src/test/resources/rest-api-spec/api/ml.delete_data_frame_analytics.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@
2626
"type":"boolean",
2727
"description":"True if the job should be forcefully deleted",
2828
"default":false
29+
},
30+
"timeout":{
31+
"type":"time",
32+
"description":"Controls the time to wait until a job is deleted. Defaults to 1 minute"
2933
}
3034
}
3135
}

0 commit comments

Comments
 (0)