kafka.admin.ReassignPartitionsCommand
is a command-line tool that allows for generating, executing and verifying a custom partition (re)assignment configuration (as specified using a reassignment JSON file).
Action | Description |
---|---|
Executes the reassignment as specified by the reassignment-json-file option |
|
Verifies if the reassignment completed as specified by the reassignment-json-file option. If there is a throttle engaged for the replicas specified, and the rebalance has completed, the throttle will be removed |
ReassignPartitionsCommand
can be executed using kafka-reassign-partitions
shell script (i.e. bin/kafka-reassign-partitions.sh
or bin\windows\kafka-reassign-partitions.bat
).
$ ./bin/kafka-reassign-partitions.sh
This command moves topic partitions between replicas.
Option | Description |
---|---|
|
The format to use is as follows:
Note that "log_dirs" is optional. When specified, its length must equal the length of the replicas list. The value in this list can be either If absolute log directory path is specified, it is currently required that the replica has not already been created on that broker. The replica will then be created in the specified log directory on the broker later. |
|
|
|
|
|
$ ./bin/kafka-topics.sh --list --zookeeper :2181
my-topic
$ cat reassign-partitions.json
{
"partitions": [
{
"topic": "my-topic",
"partition": 1,
"replicas": [
1
]
}
],
"version": 1
}
$ ./bin/kafka-reassign-partitions.sh \
--generate \
--zookeeper :2181 \
--topics-to-move-json-file reassign-partitions.json \
--broker-list 0
$ ./bin/kafka-reassign-partitions.sh \
--verify \
--zookeeper :2181 \
--reassignment-json-file reassign-partitions.json
ReassignPartitionsCommand
is created exclusively when ReassignPartitionsCommand
is requested to executeAssignment.
executeAssignment(
zkClient: KafkaZkClient,
adminClientOpt: Option[JAdminClient],
opts: ReassignPartitionsCommandOptions): Unit // (1)
executeAssignment(
zkClient: KafkaZkClient,
adminClientOpt: Option[JAdminClient],
reassignmentJsonString: String,
throttle: Throttle,
timeoutMs: Long = 10000L): Unit
-
Uses options for
reassignmentJsonString
,throttle
andtimeoutMs
inputs
executeAssignment
…FIXME
reassignPartitions(throttle: Throttle = NoThrottle, timeoutMs: Long = 10000L): Boolean
reassignPartitions
…FIXME
Note
|
reassignPartitions is used exclusively when ReassignPartitionsCommand executeAssignment.
|
alterReplicaLogDirsIgnoreReplicaNotAvailable(
replicaAssignment: Map[TopicPartitionReplica, String],
adminClient: JAdminClient,
timeoutMs: Long): Set[TopicPartitionReplica]
alterReplicaLogDirsIgnoreReplicaNotAvailable
…FIXME
Note
|
alterReplicaLogDirsIgnoreReplicaNotAvailable is used exclusively when ReassignPartitionsCommand reassignPartitions
|
generateAssignment(
zkClient: KafkaZkClient,
brokerListToReassign: Seq[Int],
topicsToMoveJsonString: String,
disableRackAware: Boolean)
: (Map[TopicPartition, Seq[Int]], Map[TopicPartition, Seq[Int]])
generateAssignment
…FIXME
Note
|
generateAssignment is used when…FIXME
|
verifyAssignment(
zkClient: KafkaZkClient,
adminClientOpt: Option[JAdminClient],
jsonString: String): Unit
verifyAssignment
…FIXME
Note
|
verifyAssignment is used when…FIXME
|
parseAndValidate(
zkClient: KafkaZkClient,
reassignmentJsonString: String)
: (Seq[(TopicPartition, Seq[Int])], Map[TopicPartitionReplica, String])
parseAndValidate
…FIXME
Note
|
parseAndValidate is used when…FIXME
|
removeThrottle(
zkClient: KafkaZkClient,
reassignedPartitionsStatus: Map[TopicPartition, ReassignmentStatus],
replicasReassignmentStatus: Map[TopicPartitionReplica, ReassignmentStatus],
adminZkClient: AdminZkClient): Unit
removeThrottle
…FIXME
Note
|
removeThrottle is used when…FIXME
|
maybeLimit(throttle: Throttle): Unit
maybeLimit
…FIXME
Note
|
maybeLimit is used when…FIXME
|
assignThrottledReplicas(
existingPartitionAssignment: Map[TopicPartition, Seq[Int]],
proposedPartitionAssignment: Map[TopicPartition, Seq[Int]],
adminZkClient: AdminZkClient): Unit
assignThrottledReplicas
…FIXME
Note
|
assignThrottledReplicas is used when…FIXME
|
existingAssignment(): Map[TopicPartition, Seq[Int]]
existingAssignment
takes the topics (from the keys) from the proposedPartitionAssignment and requests the KafkaZkClient to getReplicaAssignmentForTopics.
Note
|
existingAssignment is used when…FIXME
|
ReassignPartitionsCommand
takes the following when created:
-
Optional AdminClient
-
Proposed partition assignment (
Map[TopicPartition, Seq[Int]]
) -
Proposed replica assignment (
Map[TopicPartitionReplica, String]
)