Open
Description
目前的 shard 分配给 subTask 的逻辑,和 kafka 的 partition 分配逻辑类似,是按照 shard ID 取余匹配 subTaskIndex 的,但是 partition 一定是有序的,而 shard ID 很难做到 100% 有序。如下分配代码:
private List<LogstoreShardMeta> listAssignedShards() throws Exception {
List<String> logstores = getLogstores();
List<LogstoreShardMeta> shardMetas = new ArrayList<>();
for (String logstore : logstores) {
List<Shard> shards = logClient.listShards(project, logstore);
for (Shard shard : shards) {
LogstoreShardMeta shardMeta = new LogstoreShardMeta(logstore, shard.GetShardId(), shard.getStatus());
if (shardAssigner.assign(shardMeta, totalNumberOfSubtasks) % totalNumberOfSubtasks == indexOfThisSubtask) {
shardMetas.add(shardMeta);
}
}
}
return shardMetas;
}
当 shard ID 不连续时,存在分配不均衡的问题。如我们线上的 sls 的 shard ID 如下:
会造成 task-0、task-5、task-6 空跑,如:
Metadata
Metadata
Assignees
Labels
No labels