20
20
import org .apache .kafka .common .Uuid ;
21
21
import org .apache .kafka .common .compress .Compression ;
22
22
import org .apache .kafka .common .config .TopicConfig ;
23
+ import org .apache .kafka .common .errors .InvalidRequestException ;
23
24
import org .apache .kafka .common .errors .NotCoordinatorException ;
25
+ import org .apache .kafka .common .errors .StreamsInvalidTopologyException ;
26
+ import org .apache .kafka .common .errors .UnsupportedAssignorException ;
24
27
import org .apache .kafka .common .internals .Plugin ;
25
28
import org .apache .kafka .common .internals .Topic ;
26
29
import org .apache .kafka .common .message .ConsumerGroupDescribeResponseData ;
67
70
import org .apache .kafka .common .requests .DescribeShareGroupOffsetsRequest ;
68
71
import org .apache .kafka .common .requests .OffsetCommitRequest ;
69
72
import org .apache .kafka .common .requests .ShareGroupDescribeRequest ;
73
+ import org .apache .kafka .common .requests .ShareGroupHeartbeatRequest ;
70
74
import org .apache .kafka .common .requests .StreamsGroupDescribeRequest ;
71
75
import org .apache .kafka .common .requests .TransactionResult ;
72
76
import org .apache .kafka .common .requests .TxnOffsetCommitRequest ;
83
87
import org .apache .kafka .coordinator .common .runtime .CoordinatorShardBuilderSupplier ;
84
88
import org .apache .kafka .coordinator .common .runtime .MultiThreadedEventProcessor ;
85
89
import org .apache .kafka .coordinator .common .runtime .PartitionWriter ;
90
+ import org .apache .kafka .coordinator .group .api .assignor .ConsumerGroupPartitionAssignor ;
86
91
import org .apache .kafka .coordinator .group .metrics .GroupCoordinatorMetrics ;
87
92
import org .apache .kafka .coordinator .group .streams .StreamsGroupHeartbeatResult ;
88
93
import org .apache .kafka .image .MetadataDelta ;
126
131
import java .util .function .IntSupplier ;
127
132
import java .util .stream .Collectors ;
128
133
134
+ import static org .apache .kafka .common .requests .ConsumerGroupHeartbeatRequest .CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION ;
135
+ import static org .apache .kafka .common .requests .ConsumerGroupHeartbeatRequest .LEAVE_GROUP_MEMBER_EPOCH ;
136
+ import static org .apache .kafka .common .requests .ConsumerGroupHeartbeatRequest .LEAVE_GROUP_STATIC_MEMBER_EPOCH ;
129
137
import static org .apache .kafka .coordinator .common .runtime .CoordinatorOperationExceptionHelper .handleOperationException ;
138
+ import static org .apache .kafka .coordinator .group .Utils .throwIfEmptyString ;
139
+ import static org .apache .kafka .coordinator .group .Utils .throwIfNotEmptyCollection ;
140
+ import static org .apache .kafka .coordinator .group .Utils .throwIfNotNull ;
141
+ import static org .apache .kafka .coordinator .group .Utils .throwIfNull ;
130
142
131
143
/**
132
144
* The group coordinator service.
@@ -298,6 +310,11 @@ public GroupCoordinatorService build() {
298
310
*/
299
311
private final AtomicBoolean isActive = new AtomicBoolean (false );
300
312
313
+ /**
314
+ * The set of supported consumer group assignors.
315
+ */
316
+ private final Set <String > consumerGroupAssignors ;
317
+
301
318
/**
302
319
* The number of partitions of the __consumer_offsets topics. This is provided
303
320
* when the component is started.
@@ -336,6 +353,11 @@ public GroupCoordinatorService build() {
336
353
this .groupConfigManager = groupConfigManager ;
337
354
this .persister = persister ;
338
355
this .timer = timer ;
356
+ this .consumerGroupAssignors = config
357
+ .consumerGroupAssignors ()
358
+ .stream ()
359
+ .map (ConsumerGroupPartitionAssignor ::name )
360
+ .collect (Collectors .toSet ());
339
361
}
340
362
341
363
/**
@@ -367,6 +389,55 @@ public int partitionFor(
367
389
return Utils .abs (groupId .hashCode ()) % numPartitions ;
368
390
}
369
391
392
+ /**
393
+ * Validates the request.
394
+ *
395
+ * @param request The request to validate.
396
+ * @param apiVersion The version of ConsumerGroupHeartbeat RPC
397
+ * @throws InvalidRequestException if the request is not valid.
398
+ * @throws UnsupportedAssignorException if the assignor is not supported.
399
+ */
400
+ private void throwIfConsumerGroupHeartbeatRequestIsInvalid (
401
+ ConsumerGroupHeartbeatRequestData request ,
402
+ int apiVersion
403
+ ) throws InvalidRequestException , UnsupportedAssignorException {
404
+ if (apiVersion >= CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION ||
405
+ request .memberEpoch () > 0 ||
406
+ request .memberEpoch () == LEAVE_GROUP_MEMBER_EPOCH
407
+ ) {
408
+ throwIfEmptyString (request .memberId (), "MemberId can't be empty." );
409
+ }
410
+
411
+ throwIfEmptyString (request .groupId (), "GroupId can't be empty." );
412
+ throwIfEmptyString (request .instanceId (), "InstanceId can't be empty." );
413
+ throwIfEmptyString (request .rackId (), "RackId can't be empty." );
414
+
415
+ if (request .memberEpoch () == 0 ) {
416
+ if (request .rebalanceTimeoutMs () == -1 ) {
417
+ throw new InvalidRequestException ("RebalanceTimeoutMs must be provided in first request." );
418
+ }
419
+ if (request .topicPartitions () == null || !request .topicPartitions ().isEmpty ()) {
420
+ throw new InvalidRequestException ("TopicPartitions must be empty when (re-)joining." );
421
+ }
422
+ // We accept members joining with an empty list of names or an empty regex. It basically
423
+ // means that they are not subscribed to any topics, but they are part of the group.
424
+ if (request .subscribedTopicNames () == null && request .subscribedTopicRegex () == null ) {
425
+ throw new InvalidRequestException ("Either SubscribedTopicNames or SubscribedTopicRegex must" +
426
+ " be non-null when (re-)joining." );
427
+ }
428
+ } else if (request .memberEpoch () == LEAVE_GROUP_STATIC_MEMBER_EPOCH ) {
429
+ throwIfNull (request .instanceId (), "InstanceId can't be null." );
430
+ } else if (request .memberEpoch () < LEAVE_GROUP_STATIC_MEMBER_EPOCH ) {
431
+ throw new InvalidRequestException ("MemberEpoch is invalid." );
432
+ }
433
+
434
+ if (request .serverAssignor () != null && !consumerGroupAssignors .contains (request .serverAssignor ())) {
435
+ throw new UnsupportedAssignorException ("ServerAssignor " + request .serverAssignor ()
436
+ + " is not supported. Supported assignors: " + String .join (", " , consumerGroupAssignors )
437
+ + "." );
438
+ }
439
+ }
440
+
370
441
/**
371
442
* See {@link GroupCoordinator#consumerGroupHeartbeat(AuthorizableRequestContext, ConsumerGroupHeartbeatRequestData)}.
372
443
*/
@@ -381,6 +452,16 @@ public CompletableFuture<ConsumerGroupHeartbeatResponseData> consumerGroupHeartb
381
452
);
382
453
}
383
454
455
+ try {
456
+ throwIfConsumerGroupHeartbeatRequestIsInvalid (request , context .requestVersion ());
457
+ } catch (Throwable ex ) {
458
+ ApiError apiError = ApiError .fromThrowable (ex );
459
+ return CompletableFuture .completedFuture (new ConsumerGroupHeartbeatResponseData ()
460
+ .setErrorCode (apiError .error ().code ())
461
+ .setErrorMessage (apiError .message ())
462
+ );
463
+ }
464
+
384
465
return runtime .scheduleWriteOperation (
385
466
"consumer-group-heartbeat" ,
386
467
topicPartitionFor (request .groupId ()),
@@ -397,6 +478,65 @@ public CompletableFuture<ConsumerGroupHeartbeatResponseData> consumerGroupHeartb
397
478
));
398
479
}
399
480
481
+ private static void throwIfInvalidTopology (
482
+ StreamsGroupHeartbeatRequestData .Topology topology
483
+ ) throws StreamsInvalidTopologyException {
484
+ for (StreamsGroupHeartbeatRequestData .Subtopology subtopology : topology .subtopologies ()) {
485
+ for (StreamsGroupHeartbeatRequestData .TopicInfo topicInfo : subtopology .stateChangelogTopics ()) {
486
+ if (topicInfo .partitions () != 0 ) {
487
+ throw new StreamsInvalidTopologyException (String .format (
488
+ "Changelog topic %s must have an undefined partition count, but it is set to %d." ,
489
+ topicInfo .name (), topicInfo .partitions ()
490
+ ));
491
+ }
492
+ }
493
+ }
494
+ }
495
+
496
+ /**
497
+ * Validates the request.
498
+ *
499
+ * @param request The request to validate.
500
+ * @throws InvalidRequestException if the request is not valid.
501
+ * @throws UnsupportedAssignorException if the assignor is not supported.
502
+ */
503
+ private static void throwIfStreamsGroupHeartbeatRequestIsInvalid (
504
+ StreamsGroupHeartbeatRequestData request
505
+ ) throws InvalidRequestException {
506
+ throwIfEmptyString (request .memberId (), "MemberId can't be empty." );
507
+ throwIfEmptyString (request .groupId (), "GroupId can't be empty." );
508
+ throwIfEmptyString (request .instanceId (), "InstanceId can't be empty." );
509
+ throwIfEmptyString (request .rackId (), "RackId can't be empty." );
510
+
511
+ if (request .memberEpoch () == 0 ) {
512
+ if (request .rebalanceTimeoutMs () == -1 ) {
513
+ throw new InvalidRequestException ("RebalanceTimeoutMs must be provided in first request." );
514
+ }
515
+ throwIfNotEmptyCollection (request .activeTasks (), "ActiveTasks must be empty when (re-)joining." );
516
+ throwIfNotEmptyCollection (request .standbyTasks (), "StandbyTasks must be empty when (re-)joining." );
517
+ throwIfNotEmptyCollection (request .warmupTasks (), "WarmupTasks must be empty when (re-)joining." );
518
+ throwIfNull (request .topology (), "Topology must be non-null when (re-)joining." );
519
+ if (request .topology () != null ) {
520
+ throwIfInvalidTopology (request .topology ());
521
+ }
522
+ } else if (request .memberEpoch () == LEAVE_GROUP_STATIC_MEMBER_EPOCH ) {
523
+ throwIfNull (request .instanceId (), "InstanceId can't be null." );
524
+ } else if (request .memberEpoch () < LEAVE_GROUP_STATIC_MEMBER_EPOCH ) {
525
+ throw new InvalidRequestException (String .format ("MemberEpoch is %d, but must be greater than or equal to -2." ,
526
+ request .memberEpoch ()));
527
+ }
528
+
529
+ if (request .activeTasks () != null || request .standbyTasks () != null || request .warmupTasks () != null ) {
530
+ throwIfNull (request .activeTasks (), "If one task-type is non-null, all must be non-null." );
531
+ throwIfNull (request .standbyTasks (), "If one task-type is non-null, all must be non-null." );
532
+ throwIfNull (request .warmupTasks (), "If one task-type is non-null, all must be non-null." );
533
+ }
534
+
535
+ if (request .memberEpoch () != 0 ) {
536
+ throwIfNotNull (request .topology (), "Topology can only be provided when (re-)joining." );
537
+ }
538
+ }
539
+
400
540
/**
401
541
* See
402
542
* {@link GroupCoordinator#streamsGroupHeartbeat(AuthorizableRequestContext, StreamsGroupHeartbeatRequestData)}.
@@ -415,6 +555,20 @@ public CompletableFuture<StreamsGroupHeartbeatResult> streamsGroupHeartbeat(
415
555
);
416
556
}
417
557
558
+ try {
559
+ throwIfStreamsGroupHeartbeatRequestIsInvalid (request );
560
+ } catch (Throwable ex ) {
561
+ ApiError apiError = ApiError .fromThrowable (ex );
562
+ return CompletableFuture .completedFuture (
563
+ new StreamsGroupHeartbeatResult (
564
+ new StreamsGroupHeartbeatResponseData ()
565
+ .setErrorCode (apiError .error ().code ())
566
+ .setErrorMessage (apiError .message ()),
567
+ Map .of ()
568
+ )
569
+ );
570
+ }
571
+
418
572
return runtime .scheduleWriteOperation (
419
573
"streams-group-heartbeat" ,
420
574
topicPartitionFor (request .groupId ()),
@@ -435,6 +589,28 @@ public CompletableFuture<StreamsGroupHeartbeatResult> streamsGroupHeartbeat(
435
589
));
436
590
}
437
591
592
+ /**
593
+ * Validates the ShareGroupHeartbeat request.
594
+ *
595
+ * @param request The request to validate.
596
+ * @throws InvalidRequestException if the request is not valid.
597
+ */
598
+ private static void throwIfShareGroupHeartbeatRequestIsInvalid (
599
+ ShareGroupHeartbeatRequestData request
600
+ ) throws InvalidRequestException {
601
+ throwIfEmptyString (request .memberId (), "MemberId can't be empty." );
602
+ throwIfEmptyString (request .groupId (), "GroupId can't be empty." );
603
+ throwIfEmptyString (request .rackId (), "RackId can't be empty." );
604
+
605
+ if (request .memberEpoch () == 0 ) {
606
+ if (request .subscribedTopicNames () == null || request .subscribedTopicNames ().isEmpty ()) {
607
+ throw new InvalidRequestException ("SubscribedTopicNames must be set in first request." );
608
+ }
609
+ } else if (request .memberEpoch () < ShareGroupHeartbeatRequest .LEAVE_GROUP_MEMBER_EPOCH ) {
610
+ throw new InvalidRequestException ("MemberEpoch is invalid." );
611
+ }
612
+ }
613
+
438
614
/**
439
615
* See {@link GroupCoordinator#shareGroupHeartbeat(AuthorizableRequestContext, ShareGroupHeartbeatRequestData)}.
440
616
*/
@@ -449,6 +625,16 @@ public CompletableFuture<ShareGroupHeartbeatResponseData> shareGroupHeartbeat(
449
625
);
450
626
}
451
627
628
+ try {
629
+ throwIfShareGroupHeartbeatRequestIsInvalid (request );
630
+ } catch (Throwable ex ) {
631
+ ApiError apiError = ApiError .fromThrowable (ex );
632
+ return CompletableFuture .completedFuture (new ShareGroupHeartbeatResponseData ()
633
+ .setErrorCode (apiError .error ().code ())
634
+ .setErrorMessage (apiError .message ())
635
+ );
636
+ }
637
+
452
638
return runtime .scheduleWriteOperation (
453
639
"share-group-heartbeat" ,
454
640
topicPartitionFor (request .groupId ()),
0 commit comments