Skip to content

Commit d6133f6

Browse files
authored
KAFKA-18988: Connect Multiversion Support (Updates to status and metrics) (#17988)
Reviewers: Greg Harris <[email protected]>
1 parent 62fe528 commit d6133f6

32 files changed

+556
-102
lines changed

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -203,83 +203,91 @@ public boolean isReady() {
203203
@Override
204204
public void onStartup(String connector) {
205205
statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.RUNNING,
206-
workerId, generation()));
206+
workerId, generation(), worker.connectorVersion(connector)));
207207
}
208208

209209
@Override
210210
public void onStop(String connector) {
211211
statusBackingStore.put(new ConnectorStatus(connector, AbstractStatus.State.STOPPED,
212-
workerId, generation()));
212+
workerId, generation(), worker.connectorVersion(connector)));
213213
}
214214

215215
@Override
216216
public void onPause(String connector) {
217217
statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.PAUSED,
218-
workerId, generation()));
218+
workerId, generation(), worker.connectorVersion(connector)));
219219
}
220220

221221
@Override
222222
public void onResume(String connector) {
223223
statusBackingStore.put(new ConnectorStatus(connector, TaskStatus.State.RUNNING,
224-
workerId, generation()));
224+
workerId, generation(), worker.connectorVersion(connector)));
225225
}
226226

227227
@Override
228228
public void onShutdown(String connector) {
229229
statusBackingStore.putSafe(new ConnectorStatus(connector, ConnectorStatus.State.UNASSIGNED,
230-
workerId, generation()));
230+
workerId, generation(), worker.connectorVersion(connector)));
231231
}
232232

233233
@Override
234234
public void onFailure(String connector, Throwable cause) {
235235
statusBackingStore.putSafe(new ConnectorStatus(connector, ConnectorStatus.State.FAILED,
236-
trace(cause), workerId, generation()));
236+
trace(cause), workerId, generation(), worker.connectorVersion(connector)));
237237
}
238238

239239
@Override
240240
public void onStartup(ConnectorTaskId id) {
241-
statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RUNNING, workerId, generation()));
241+
statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RUNNING, workerId, generation(), null,
242+
worker.taskVersion(id)));
242243
}
243244

244245
@Override
245246
public void onFailure(ConnectorTaskId id, Throwable cause) {
246-
statusBackingStore.putSafe(new TaskStatus(id, TaskStatus.State.FAILED, workerId, generation(), trace(cause)));
247+
statusBackingStore.putSafe(new TaskStatus(id, TaskStatus.State.FAILED, workerId, generation(), trace(cause),
248+
worker.taskVersion(id)));
247249
}
248250

249251
@Override
250252
public void onShutdown(ConnectorTaskId id) {
251-
statusBackingStore.putSafe(new TaskStatus(id, TaskStatus.State.UNASSIGNED, workerId, generation()));
253+
statusBackingStore.putSafe(new TaskStatus(id, TaskStatus.State.UNASSIGNED, workerId, generation(), null,
254+
worker.taskVersion(id)));
252255
}
253256

254257
@Override
255258
public void onResume(ConnectorTaskId id) {
256-
statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RUNNING, workerId, generation()));
259+
statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RUNNING, workerId, generation(), null,
260+
worker.taskVersion(id)));
257261
}
258262

259263
@Override
260264
public void onPause(ConnectorTaskId id) {
261-
statusBackingStore.put(new TaskStatus(id, TaskStatus.State.PAUSED, workerId, generation()));
265+
statusBackingStore.put(new TaskStatus(id, TaskStatus.State.PAUSED, workerId, generation(), null,
266+
worker.taskVersion(id)));
262267
}
263268

264269
@Override
265270
public void onDeletion(String connector) {
266271
for (TaskStatus status : statusBackingStore.getAll(connector))
267272
onDeletion(status.id());
268-
statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.DESTROYED, workerId, generation()));
273+
statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.DESTROYED, workerId, generation(),
274+
worker.connectorVersion(connector)));
269275
}
270276

271277
@Override
272278
public void onDeletion(ConnectorTaskId id) {
273-
statusBackingStore.put(new TaskStatus(id, TaskStatus.State.DESTROYED, workerId, generation()));
279+
statusBackingStore.put(new TaskStatus(id, TaskStatus.State.DESTROYED, workerId, generation(), null,
280+
worker.taskVersion(id)));
274281
}
275282

276283
public void onRestart(String connector) {
277284
statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.RESTARTING,
278-
workerId, generation()));
285+
workerId, generation(), worker.connectorVersion(connector)));
279286
}
280287

281288
public void onRestart(ConnectorTaskId id) {
282-
statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RESTARTING, workerId, generation()));
289+
statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RESTARTING, workerId, generation(), null,
290+
worker.taskVersion(id)));
283291
}
284292

285293
@Override
@@ -347,12 +355,12 @@ public ConnectorStateInfo connectorStatus(String connName) {
347355
Collection<TaskStatus> tasks = statusBackingStore.getAll(connName);
348356

349357
ConnectorStateInfo.ConnectorState connectorState = new ConnectorStateInfo.ConnectorState(
350-
connector.state().toString(), connector.workerId(), connector.trace());
358+
connector.state().toString(), connector.workerId(), connector.trace(), connector.version());
351359
List<ConnectorStateInfo.TaskState> taskStates = new ArrayList<>();
352360

353361
for (TaskStatus status : tasks) {
354362
taskStates.add(new ConnectorStateInfo.TaskState(status.id().task(),
355-
status.state().toString(), status.workerId(), status.trace()));
363+
status.state().toString(), status.workerId(), status.trace(), status.version()));
356364
}
357365

358366
Collections.sort(taskStates);
@@ -388,7 +396,7 @@ public ConnectorStateInfo.TaskState taskStatus(ConnectorTaskId id) {
388396
throw new NotFoundException("No status found for task " + id);
389397

390398
return new ConnectorStateInfo.TaskState(id.task(), status.state().toString(),
391-
status.workerId(), status.trace());
399+
status.workerId(), status.trace(), status.version());
392400
}
393401

394402
@Override
@@ -626,7 +634,8 @@ public Optional<RestartPlan> buildRestartPlan(RestartRequest request) {
626634
ConnectorStateInfo.ConnectorState connectorInfoState = new ConnectorStateInfo.ConnectorState(
627635
connectorState.toString(),
628636
connectorStatus.workerId(),
629-
connectorStatus.trace()
637+
connectorStatus.trace(),
638+
connectorStatus.version()
630639
);
631640

632641
// Collect the task states, If requested, mark the task as restarting
@@ -638,7 +647,8 @@ public Optional<RestartPlan> buildRestartPlan(RestartRequest request) {
638647
taskStatus.id().task(),
639648
taskState.toString(),
640649
taskStatus.workerId(),
641-
taskStatus.trace()
650+
taskStatus.trace(),
651+
taskStatus.version()
642652
);
643653
})
644654
.collect(Collectors.toList());

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,29 @@ public enum State {
3434
private final State state;
3535
private final String trace;
3636
private final String workerId;
37+
private final String version;
3738
private final int generation;
3839

3940
public AbstractStatus(T id,
4041
State state,
4142
String workerId,
4243
int generation,
43-
String trace) {
44+
String trace,
45+
String version) {
4446
this.id = id;
4547
this.state = state;
4648
this.workerId = workerId;
4749
this.generation = generation;
4850
this.trace = trace;
51+
this.version = version;
52+
}
53+
54+
public AbstractStatus(T id,
55+
State state,
56+
String workerId,
57+
int generation,
58+
String trace) {
59+
this(id, state, workerId, generation, trace, null);
4960
}
5061

5162
public T id() {
@@ -68,12 +79,17 @@ public int generation() {
6879
return generation;
6980
}
7081

82+
public String version() {
83+
return version;
84+
}
85+
7186
@Override
7287
public String toString() {
7388
return "Status{" +
7489
"id=" + id +
7590
", state=" + state +
7691
", workerId='" + workerId + '\'' +
92+
", version='" + version + '\'' +
7793
", generation=" + generation +
7894
'}';
7995
}
@@ -89,7 +105,8 @@ public boolean equals(Object o) {
89105
&& Objects.equals(id, that.id)
90106
&& state == that.state
91107
&& Objects.equals(trace, that.trace)
92-
&& Objects.equals(workerId, that.workerId);
108+
&& Objects.equals(workerId, that.workerId)
109+
&& Objects.equals(version, that.version);
93110
}
94111

95112
@Override
@@ -98,6 +115,7 @@ public int hashCode() {
98115
result = 31 * result + (state != null ? state.hashCode() : 0);
99116
result = 31 * result + (trace != null ? trace.hashCode() : 0);
100117
result = 31 * result + (workerId != null ? workerId.hashCode() : 0);
118+
result = 31 * result + (version != null ? version.hashCode() : 0);
101119
result = 31 * result + generation;
102120
return result;
103121
}

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ protected abstract void producerSendFailed(
203203
private final boolean topicTrackingEnabled;
204204
private final TopicCreation topicCreation;
205205
private final Executor closeExecutor;
206+
private final String version;
206207

207208
// Visible for testing
208209
List<SourceRecord> toSend;
@@ -236,11 +237,12 @@ protected AbstractWorkerSourceTask(ConnectorTaskId id,
236237
StatusBackingStore statusBackingStore,
237238
Executor closeExecutor,
238239
Supplier<List<ErrorReporter<SourceRecord>>> errorReportersSupplier,
240+
TaskPluginsMetadata pluginsMetadata,
239241
Function<ClassLoader, LoaderSwap> pluginLoaderSwapper) {
240242

241243
super(id, statusListener, initialState, loader, connectMetrics, errorMetrics,
242244
retryWithToleranceOperator, transformationChain, errorReportersSupplier,
243-
time, statusBackingStore, pluginLoaderSwapper);
245+
time, statusBackingStore, pluginsMetadata, pluginLoaderSwapper);
244246

245247
this.workerConfig = workerConfig;
246248
this.task = task;
@@ -258,6 +260,7 @@ protected AbstractWorkerSourceTask(ConnectorTaskId id,
258260
this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics);
259261
this.topicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
260262
this.topicCreation = TopicCreation.newTopicCreation(workerConfig, topicGroups);
263+
this.version = task.version();
261264
}
262265

263266
@Override
@@ -391,6 +394,11 @@ public void execute() {
391394
finalOffsetCommit(false);
392395
}
393396

397+
@Override
398+
public String taskVersion() {
399+
return version;
400+
}
401+
394402
/**
395403
* Try to send a batch of records. If a send fails and is retriable, this saves the remainder of the batch so it can
396404
* be retried after backing off. If a send fails and is not retriable, this will throw a ConnectException.

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ public class ConnectMetricsRegistry {
3737
public static final String WORKER_GROUP_NAME = "connect-worker-metrics";
3838
public static final String WORKER_REBALANCE_GROUP_NAME = "connect-worker-rebalance-metrics";
3939
public static final String TASK_ERROR_HANDLING_GROUP_NAME = "task-error-metrics";
40+
public static final String TRANSFORMS_GROUP = "connector-transform-metrics";
41+
public static final String PREDICATES_GROUP = "connector-predicate-metrics";
42+
public static final String TRANSFORM_TAG_NAME = "transform";
43+
public static final String PREDICATE_TAG_NAME = "predicate";
4044

4145
private final List<MetricNameTemplate> allTemplates = new ArrayList<>();
4246
public final MetricNameTemplate connectorStatus;
@@ -59,6 +63,17 @@ public class ConnectMetricsRegistry {
5963
public final MetricNameTemplate taskBatchSizeAvg;
6064
public final MetricNameTemplate taskCommitFailurePercentage;
6165
public final MetricNameTemplate taskCommitSuccessPercentage;
66+
public final MetricNameTemplate taskConnectorClass;
67+
public final MetricNameTemplate taskConnectorClassVersion;
68+
public final MetricNameTemplate taskConnectorType;
69+
public final MetricNameTemplate taskClass;
70+
public final MetricNameTemplate taskVersion;
71+
public final MetricNameTemplate taskKeyConverterClass;
72+
public final MetricNameTemplate taskValueConverterClass;
73+
public final MetricNameTemplate taskKeyConverterVersion;
74+
public final MetricNameTemplate taskValueConverterVersion;
75+
public final MetricNameTemplate taskHeaderConverterClass;
76+
public final MetricNameTemplate taskHeaderConverterVersion;
6277
public final MetricNameTemplate sourceRecordPollRate;
6378
public final MetricNameTemplate sourceRecordPollTotal;
6479
public final MetricNameTemplate sourceRecordWriteRate;
@@ -115,6 +130,10 @@ public class ConnectMetricsRegistry {
115130
public final MetricNameTemplate transactionSizeMin;
116131
public final MetricNameTemplate transactionSizeMax;
117132
public final MetricNameTemplate transactionSizeAvg;
133+
public final MetricNameTemplate transformClass;
134+
public final MetricNameTemplate transformVersion;
135+
public final MetricNameTemplate predicateClass;
136+
public final MetricNameTemplate predicateVersion;
118137

119138
public Map<MetricNameTemplate, TaskStatus.State> connectorStatusMetrics;
120139

@@ -164,6 +183,43 @@ public ConnectMetricsRegistry(Set<String> tags) {
164183
taskCommitSuccessPercentage = createTemplate("offset-commit-success-percentage", TASK_GROUP_NAME,
165184
"The average percentage of this task's offset commit attempts that succeeded.",
166185
workerTaskTags);
186+
taskConnectorClass = createTemplate("connector-class", TASK_GROUP_NAME, "The name of the connector class.", workerTaskTags);
187+
taskConnectorClassVersion = createTemplate("connector-version", TASK_GROUP_NAME,
188+
"The version of the connector class, as reported by the connector.", workerTaskTags);
189+
taskConnectorType = createTemplate("connector-type", TASK_GROUP_NAME, "The type of the connector. One of 'source' or 'sink'.",
190+
workerTaskTags);
191+
taskClass = createTemplate("task-class", TASK_GROUP_NAME, "The class name of the task.", workerTaskTags);
192+
taskVersion = createTemplate("task-version", TASK_GROUP_NAME, "The version of the task.", workerTaskTags);
193+
taskKeyConverterClass = createTemplate("key-converter-class", TASK_GROUP_NAME,
194+
"The fully qualified class name from key.converter", workerTaskTags);
195+
taskValueConverterClass = createTemplate("value-converter-class", TASK_GROUP_NAME,
196+
"The fully qualified class name from value.converter", workerTaskTags);
197+
taskKeyConverterVersion = createTemplate("key-converter-version", TASK_GROUP_NAME,
198+
"The version instantiated for key.converter. May be undefined", workerTaskTags);
199+
taskValueConverterVersion = createTemplate("value-converter-version", TASK_GROUP_NAME,
200+
"The version instantiated for value.converter. May be undefined", workerTaskTags);
201+
taskHeaderConverterClass = createTemplate("header-converter-class", TASK_GROUP_NAME,
202+
"The fully qualified class name from header.converter", workerTaskTags);
203+
taskHeaderConverterVersion = createTemplate("header-converter-version", TASK_GROUP_NAME,
204+
"The version instantiated for header.converter. May be undefined", workerTaskTags);
205+
206+
/* Transformation Metrics */
207+
Set<String> transformTags = new LinkedHashSet<>(tags);
208+
transformTags.addAll(workerTaskTags);
209+
transformTags.add(TRANSFORM_TAG_NAME);
210+
transformClass = createTemplate("transform-class", TRANSFORMS_GROUP,
211+
"The class name of the transformation class", transformTags);
212+
transformVersion = createTemplate("transform-version", TRANSFORMS_GROUP,
213+
"The version of the transformation class", transformTags);
214+
215+
/* Predicate Metrics */
216+
Set<String> predicateTags = new LinkedHashSet<>(tags);
217+
predicateTags.addAll(workerTaskTags);
218+
predicateTags.add(PREDICATE_TAG_NAME);
219+
predicateClass = createTemplate("predicate-class", PREDICATES_GROUP,
220+
"The class name of the predicate class", predicateTags);
221+
predicateVersion = createTemplate("predicate-version", PREDICATES_GROUP,
222+
"The version of the predicate class", predicateTags);
167223

168224
/* Source worker task level */
169225
Set<String> sourceTaskTags = new LinkedHashSet<>(tags);
@@ -426,4 +482,20 @@ public String workerRebalanceGroupName() {
426482
public String taskErrorHandlingGroupName() {
427483
return TASK_ERROR_HANDLING_GROUP_NAME;
428484
}
485+
486+
public String transformsGroupName() {
487+
return TRANSFORMS_GROUP;
488+
}
489+
490+
public String transformsTagName() {
491+
return TRANSFORM_TAG_NAME;
492+
}
493+
494+
public String predicatesGroupName() {
495+
return PREDICATES_GROUP;
496+
}
497+
498+
public String predicateTagName() {
499+
return PREDICATE_TAG_NAME;
500+
}
429501
}

0 commit comments

Comments
 (0)