Skip to content

[Enhancement] (nereids)implement adminSetReplicaVersionCommand in nereids #50617

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@ supportedAdminStatement
| ADMIN CLEAN TRASH
(ON LEFT_PAREN backends+=STRING_LITERAL
(COMMA backends+=STRING_LITERAL)* RIGHT_PAREN)? #adminCleanTrash
| ADMIN SET REPLICA VERSION PROPERTIES LEFT_PAREN propertyItemList RIGHT_PAREN #adminSetReplicaVersion
| ADMIN SET TABLE name=multipartIdentifier STATUS properties=propertyClause? #adminSetTableStatus
| ADMIN SET REPLICA STATUS PROPERTIES LEFT_PAREN propertyItemList RIGHT_PAREN #adminSetReplicaStatus
| ADMIN REPAIR TABLE baseTableRef #adminRepairTable
Expand All @@ -612,8 +613,7 @@ supportedRecoverStatement
;

unsupportedAdminStatement
: ADMIN SET REPLICA VERSION PROPERTIES LEFT_PAREN propertyItemList RIGHT_PAREN #adminSetReplicaVersion
| ADMIN SET (FRONTEND | (ALL FRONTENDS)) CONFIG
: ADMIN SET (FRONTEND | (ALL FRONTENDS)) CONFIG
(LEFT_PAREN propertyItemList RIGHT_PAREN)? ALL? #adminSetFrontendConfig
| ADMIN SET TABLE name=multipartIdentifier
PARTITION VERSION properties=propertyClause? #adminSetPartitionVersion
Expand Down
13 changes: 13 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@
import org.apache.doris.nereids.jobs.load.LabelProcessor;
import org.apache.doris.nereids.stats.HboPlanStatisticsManager;
import org.apache.doris.nereids.trees.plans.commands.AdminSetReplicaStatusCommand;
import org.apache.doris.nereids.trees.plans.commands.AdminSetReplicaVersionCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterSystemCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterTableCommand;
import org.apache.doris.nereids.trees.plans.commands.AnalyzeCommand;
Expand Down Expand Up @@ -6652,6 +6653,18 @@ public void setReplicaVersion(AdminSetReplicaVersionStmt stmt) throws MetaNotFou
updateTime, false);
}

// Set specified replica's version. If replica does not exist, just ignore it.
public void setReplicaVersion(AdminSetReplicaVersionCommand command) throws MetaNotFoundException {
long tabletId = command.getTabletId();
long backendId = command.getBackendId();
Long version = command.getVersion();
Long lastSuccessVersion = command.getLastSuccessVersion();
Long lastFailedVersion = command.getLastFailedVersion();
long updateTime = System.currentTimeMillis();
setReplicaVersionInternal(tabletId, backendId, version, lastSuccessVersion, lastFailedVersion,
updateTime, false);
}

public void replaySetReplicaVersion(SetReplicaVersionOperationLog log) throws MetaNotFoundException {
setReplicaVersionInternal(log.getTabletId(), log.getBackendId(), log.getVersion(),
log.getLastSuccessVersion(), log.getLastFailedVersion(), log.getUpdateTime(), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@
import org.apache.doris.nereids.trees.plans.commands.AdminRebalanceDiskCommand;
import org.apache.doris.nereids.trees.plans.commands.AdminRepairTableCommand;
import org.apache.doris.nereids.trees.plans.commands.AdminSetReplicaStatusCommand;
import org.apache.doris.nereids.trees.plans.commands.AdminSetReplicaVersionCommand;
import org.apache.doris.nereids.trees.plans.commands.AdminSetTableStatusCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterCatalogCommentCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterCatalogPropertiesCommand;
Expand Down Expand Up @@ -5822,6 +5823,11 @@ public LogicalPlan visitAdminSetTableStatus(AdminSetTableStatusContext ctx) {
return new AdminSetTableStatusCommand(new TableNameInfo(dbTblNameParts), properties);
}

@Override
public LogicalPlan visitAdminSetReplicaVersion(DorisParser.AdminSetReplicaVersionContext ctx) {
return new AdminSetReplicaVersionCommand(visitPropertyItemList(ctx.propertyItemList()));
}

@Override
public LogicalPlan visitShowFrontends(ShowFrontendsContext ctx) {
String detail = (ctx.name != null) ? ctx.name.getText() : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ public enum PlanType {
RECOVER_PARTITION_COMMAND,
REPLAY_COMMAND,
ADMIN_REBALANCE_DISK_COMMAND,
ADMIN_SET_REPLICA_VERSION_COMMAND,
ADMIN_CANCEL_REBALANCE_DISK_COMMAND,
CREATE_ENCRYPTKEY_COMMAND,
CREATE_WORKLOAD_GROUP_COMMAND,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;

import java.util.Map;

/**
* admin set replicas status properties ("key" = "val", ..);
* Required:
* "tablet_id" = "10010",
* "backend_id" = "10001",
* Optional:
* "version" = "100",
* "last_success_version" = "100",
* "last_failed_version" = "-1",
*/
public class AdminSetReplicaVersionCommand extends Command implements ForwardWithSync {
public static final String TABLET_ID = "tablet_id";
public static final String BACKEND_ID = "backend_id";
public static final String VERSION = "version";
public static final String LAST_SUCCESS_VERSION = "last_success_version";
public static final String LAST_FAILED_VERSION = "last_failed_version";

private Map<String, String> properties;
private long tabletId = -1;
private long backendId = -1;
private Long version = null;
private Long lastSuccessVersion = null;
private Long lastFailedVersion = null;

public AdminSetReplicaVersionCommand(Map<String, String> properties) {
super(PlanType.ADMIN_SET_REPLICA_VERSION_COMMAND);
this.properties = properties;
}

@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
validate();
Env.getCurrentEnv().setReplicaVersion(this);
}

/**
* validate
*/
public void validate() throws AnalysisException {
// check auth
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
}

checkProperties();
}

private void checkProperties() throws AnalysisException {
for (Map.Entry<String, String> entry : properties.entrySet()) {
String key = entry.getKey();
String val = entry.getValue();

if (key.equalsIgnoreCase(TABLET_ID)) {
try {
tabletId = Long.valueOf(val);
} catch (NumberFormatException e) {
throw new AnalysisException("Invalid tablet id format: " + val);
}
} else if (key.equalsIgnoreCase(BACKEND_ID)) {
try {
backendId = Long.valueOf(val);
} catch (NumberFormatException e) {
throw new AnalysisException("Invalid backend id format: " + val);
}
} else if (key.equalsIgnoreCase(VERSION)) {
try {
version = Long.valueOf(val);
} catch (NumberFormatException e) {
throw new AnalysisException("Invalid version format: " + val);
}
if (version <= 0) {
throw new AnalysisException("Required version > 0");
}
} else if (key.equalsIgnoreCase(LAST_SUCCESS_VERSION)) {
try {
lastSuccessVersion = Long.valueOf(val);
} catch (NumberFormatException e) {
throw new AnalysisException("Invalid last success version format: " + val);
}
if (lastSuccessVersion <= 0) {
throw new AnalysisException("Required last success version > 0");
}
} else if (key.equalsIgnoreCase(LAST_FAILED_VERSION)) {
try {
lastFailedVersion = Long.valueOf(val);
} catch (NumberFormatException e) {
throw new AnalysisException("Invalid last failed version format: " + val);
}
if (lastFailedVersion <= 0 && lastFailedVersion != -1) {
throw new AnalysisException("Required last failed version > 0 or == -1");
}
} else {
throw new AnalysisException("Unknown property: " + key);
}
}

if (tabletId == -1 || backendId == -1
|| (version == null && lastSuccessVersion == null && lastFailedVersion == null)) {
throw new AnalysisException("Should add following properties: TABLET_ID, BACKEND_ID, "
+ "VERSION, LAST_SUCCESS_VERSION, LAST_FAILED_VERSION");
}
}

@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitAdminSetReplicaVersionCommand(this, context);
}

public Map<String, String> getProperties() {
return properties;
}

public long getTabletId() {
return tabletId;
}

public long getBackendId() {
return backendId;
}

public Long getVersion() {
return version;
}

public Long getLastSuccessVersion() {
return lastSuccessVersion;
}

public Long getLastFailedVersion() {
return lastFailedVersion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.nereids.trees.plans.commands.AdminRebalanceDiskCommand;
import org.apache.doris.nereids.trees.plans.commands.AdminRepairTableCommand;
import org.apache.doris.nereids.trees.plans.commands.AdminSetReplicaStatusCommand;
import org.apache.doris.nereids.trees.plans.commands.AdminSetReplicaVersionCommand;
import org.apache.doris.nereids.trees.plans.commands.AdminSetTableStatusCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterCatalogCommentCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterCatalogPropertiesCommand;
Expand Down Expand Up @@ -1217,6 +1218,10 @@ default R visitCreateStageCommand(CreateStageCommand createStageCommand, C conte
return visitCommand(createStageCommand, context);
}

default R visitAdminSetReplicaVersionCommand(AdminSetReplicaVersionCommand command, C context) {
return visitCommand(command, context);
}

default R visitDropStageCommand(DropStageCommand dropStageCommand, C context) {
return visitCommand(dropStageCommand, context);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands;

import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.utframe.TestWithFeService;

import com.google.common.collect.Lists;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.List;

public class AdminSetReplicaVersionCommandTest extends TestWithFeService {
@Override
protected void runBeforeAll() throws Exception {
createDatabase("testversion");

createTable("CREATE TABLE testversion.tbl1version (\n"
+ " `id` int(11) NULL COMMENT \"\",\n"
+ " `id2` bitmap bitmap_union\n"
+ ") ENGINE=OLAP\n"
+ "AGGREGATE KEY(`id`)\n"
+ "DISTRIBUTED BY HASH(`id`) BUCKETS 3\n"
+ "PROPERTIES (\n"
+ " \"replication_num\" = \"1\"\n"
+ ");");

createTable("CREATE TABLE testversion.tbl2version (\n"
+ " `id` int(11) NULL COMMENT \"\",\n"
+ " `name` varchar(20) NULL\n"
+ ") ENGINE=OLAP\n"
+ "DUPLICATE KEY(`id`, `name`)\n"
+ "DISTRIBUTED BY HASH(`id`) BUCKETS 3\n"
+ "PROPERTIES (\n"
+ " \"replication_num\" = \"1\"\n"
+ ");");

// for test set replica version
createTable("CREATE TABLE testversion.tbl3version (\n"
+ " `id` int(11) NULL COMMENT \"\",\n"
+ " `name` varchar(20) NULL\n"
+ ") ENGINE=OLAP\n"
+ "DUPLICATE KEY(`id`, `name`)\n"
+ "DISTRIBUTED BY HASH(`id`) BUCKETS 3\n"
+ "PROPERTIES (\n"
+ " \"replication_num\" = \"1\"\n"
+ ");");
}

@Test
public void testAdminSetReplicaVersion() throws Exception {
Database db = Env.getCurrentInternalCatalog().getDbNullable("testversion");
Assertions.assertNotNull(db);
OlapTable tbl = (OlapTable) db.getTableNullable("tbl3version");
Assertions.assertNotNull(tbl);
// tablet id, backend id
List<Pair<Long, Long>> tabletToBackendList = Lists.newArrayList();
for (Partition partition : tbl.getPartitions()) {
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.VISIBLE)) {
for (Tablet tablet : index.getTablets()) {
for (Replica replica : tablet.getReplicas()) {
tabletToBackendList.add(Pair.of(tablet.getId(), replica.getBackendId()));
}
}
}
}
Assertions.assertEquals(3, tabletToBackendList.size());
long tabletId = tabletToBackendList.get(0).first;
long backendId = tabletToBackendList.get(0).second;
Replica replica = Env.getCurrentInvertedIndex().getReplica(tabletId, backendId);

String sql = "admin set replica version properties ('tablet_id' = '" + tabletId + "', 'backend_id' = '"
+ backendId + "', 'version' = '10', 'last_failed_version' = '100');";
LogicalPlan plan = new NereidsParser().parseSingle(sql);
Assertions.assertTrue(plan instanceof AdminSetReplicaVersionCommand);
Assertions.assertDoesNotThrow(() -> ((AdminSetReplicaVersionCommand) plan).run(connectContext, null));
Assertions.assertEquals(10L, replica.getVersion());
Assertions.assertEquals(10L, replica.getLastSuccessVersion());
Assertions.assertEquals(100L, replica.getLastFailedVersion());

sql = "admin set replica version properties ('tablet_id' = '" + tabletId + "', 'backend_id' = '"
+ backendId + "', 'version' = '50');";
LogicalPlan plan1 = new NereidsParser().parseSingle(sql);
Assertions.assertTrue(plan1 instanceof AdminSetReplicaVersionCommand);

Assertions.assertDoesNotThrow(() -> ((AdminSetReplicaVersionCommand) plan1).run(connectContext, null));
Assertions.assertEquals(50L, replica.getVersion());
Assertions.assertEquals(50L, replica.getLastSuccessVersion());
Assertions.assertEquals(100L, replica.getLastFailedVersion());

sql = "admin set replica version properties ('tablet_id' = '" + tabletId + "', 'backend_id' = '"
+ backendId + "', 'version' = '200');";
LogicalPlan plan2 = new NereidsParser().parseSingle(sql);
Assertions.assertTrue(plan2 instanceof AdminSetReplicaVersionCommand);

Assertions.assertDoesNotThrow(() -> ((AdminSetReplicaVersionCommand) plan2).run(connectContext, null));
Assertions.assertEquals(200L, replica.getVersion());
Assertions.assertEquals(200L, replica.getLastSuccessVersion());
Assertions.assertEquals(-1L, replica.getLastFailedVersion());

sql = "admin set replica version properties ('tablet_id' = '" + tabletId + "', 'backend_id' = '"
+ backendId + "', 'last_failed_version' = '300');";
LogicalPlan plan3 = new NereidsParser().parseSingle(sql);
Assertions.assertTrue(plan instanceof AdminSetReplicaVersionCommand);

Assertions.assertDoesNotThrow(() -> ((AdminSetReplicaVersionCommand) plan3).run(connectContext, null));
Assertions.assertEquals(300L, replica.getLastFailedVersion());

sql = "admin set replica version properties ('tablet_id' = '" + tabletId + "', 'backend_id' = '"
+ backendId + "', 'last_failed_version' = '-1');";
LogicalPlan plan4 = new NereidsParser().parseSingle(sql);
Assertions.assertTrue(plan4 instanceof AdminSetReplicaVersionCommand);

Assertions.assertDoesNotThrow(() -> ((AdminSetReplicaVersionCommand) plan4).run(connectContext, null));
Assertions.assertEquals(-1L, replica.getLastFailedVersion());
}
}