Skip to content

optimize: split the task thread pool for committing and rollbacking statuses #6499

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 20, 2024
15 changes: 15 additions & 0 deletions common/src/main/java/org/apache/seata/common/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,21 @@ public interface Constants {
*/
String UNDOLOG_DELETE = "UndologDelete";

/**
* The constant SYNC_PROCESSING
*/
String SYNC_PROCESSING = "SyncProcessing";

/**
* The constant Committing
*/
String COMMITTING = "Committing";

/**
* The constant Rollbacking
*/
String ROLLBACKING = "Rollbacking";

/**
* The constant AUTO_COMMIT
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.apache.seata.server.coordinator;

import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
Expand Down Expand Up @@ -73,8 +75,11 @@
import org.slf4j.MDC;

import static org.apache.seata.common.Constants.ASYNC_COMMITTING;
import static org.apache.seata.common.Constants.COMMITTING;
import static org.apache.seata.common.Constants.RETRY_COMMITTING;
import static org.apache.seata.common.Constants.RETRY_ROLLBACKING;
import static org.apache.seata.common.Constants.ROLLBACKING;
import static org.apache.seata.common.Constants.SYNC_PROCESSING;
import static org.apache.seata.common.Constants.TX_TIMEOUT_CHECK;
import static org.apache.seata.common.Constants.UNDOLOG_DELETE;
import static org.apache.seata.common.DefaultValues.DEFAULT_ASYNC_COMMITTING_RETRY_PERIOD;
Expand Down Expand Up @@ -140,6 +145,11 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran
*/
private static final int DEFAULT_BRANCH_ASYNC_QUEUE_SIZE = 5000;

/**
* the constant DEFAULT_SYNC_PROCESSING_DELAY
*/
private static final long DEFAULT_SYNC_PROCESSING_DELAY = 1000;

/**
* the pool size of branch asynchronous remove thread pool
*/
Expand Down Expand Up @@ -169,10 +179,17 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran
private final ScheduledThreadPoolExecutor undoLogDelete =
new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(UNDOLOG_DELETE, 1));

private final GlobalStatus[] rollbackingStatuses = new GlobalStatus[] {GlobalStatus.TimeoutRollbacking,
GlobalStatus.TimeoutRollbackRetrying, GlobalStatus.RollbackRetrying, GlobalStatus.Rollbacking};
private final ScheduledThreadPoolExecutor syncProcessing =
new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(SYNC_PROCESSING, 1));

private final GlobalStatus[] retryRollbackingStatuses = new GlobalStatus[] {
GlobalStatus.TimeoutRollbacking,
GlobalStatus.TimeoutRollbackRetrying, GlobalStatus.RollbackRetrying};

private final GlobalStatus[] retryCommittingStatuses = new GlobalStatus[] {GlobalStatus.CommitRetrying, GlobalStatus.Committed};

private final GlobalStatus[] retryCommittingStatuses = new GlobalStatus[] {GlobalStatus.Committing, GlobalStatus.CommitRetrying, GlobalStatus.Committed};
private final GlobalStatus[] rollbackingStatuses = new GlobalStatus[] {GlobalStatus.Rollbacking};
private final GlobalStatus[] committingStatuses = new GlobalStatus[] {GlobalStatus.Committing};

private final ThreadPoolExecutor branchRemoveExecutor;

Expand Down Expand Up @@ -366,7 +383,7 @@ protected void timeoutCheck() {
* Handle retry rollbacking.
*/
protected void handleRetryRollbacking() {
SessionCondition sessionCondition = new SessionCondition(rollbackingStatuses);
SessionCondition sessionCondition = new SessionCondition(retryRollbackingStatuses);
sessionCondition.setLazyLoadBranch(true);
Collection<GlobalSession> rollbackingSessions =
SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition);
Expand All @@ -376,12 +393,6 @@ protected void handleRetryRollbacking() {
long now = System.currentTimeMillis();
SessionHelper.forEach(rollbackingSessions, rollbackingSession -> {
try {
// prevent repeated rollback
if (rollbackingSession.getStatus() == GlobalStatus.Rollbacking
&& !rollbackingSession.isDeadSession()) {
// The function of this 'return' is 'continue'.
return;
}
if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT, rollbackingSession.getBeginTime())) {
if (ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE) {
rollbackingSession.clean();
Expand Down Expand Up @@ -414,8 +425,7 @@ protected void handleRetryCommitting() {
SessionHelper.forEach(committingSessions, committingSession -> {
try {
// prevent repeated commit
if ((GlobalStatus.Committing.equals(committingSession.getStatus())
|| GlobalStatus.Committed.equals(committingSession.getStatus()))
if (GlobalStatus.Committed.equals(committingSession.getStatus())
&& !committingSession.isDeadSession()) {
// The function of this 'return' is 'continue'.
return;
Expand Down Expand Up @@ -488,6 +498,96 @@ private boolean isRetryTimeout(long now, long timeout, long beginTime) {
return timeout >= ALWAYS_RETRY_BOUNDARY && now - beginTime > timeout;
}

/**
* Handle rollbacking by scheduled.
*/
protected void handleRollbackingByScheduled() {
SessionCondition sessionCondition = new SessionCondition(rollbackingStatuses);
sessionCondition.setLazyLoadBranch(true);
List<GlobalSession> rollbackingSessions =
SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition);
if (CollectionUtils.isEmpty(rollbackingSessions)) {
rollbackingSchedule(DEFAULT_SYNC_PROCESSING_DELAY);
return;
}
rollbackingSessions.sort(Comparator.comparingLong(GlobalSession::getBeginTime));
//The first one is the oldest one.
GlobalSession globalSession = rollbackingSessions.get(0);
if (!globalSession.isDeadSession()) {
rollbackingSchedule(System.currentTimeMillis() - globalSession.getBeginTime());
return;
}
long now = System.currentTimeMillis();
SessionHelper.forEach(rollbackingSessions, rollbackingSession -> {
try {
if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT, rollbackingSession.getBeginTime())) {
if (ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE) {
rollbackingSession.clean();
}

SessionHelper.endRollbackFailed(rollbackingSession, true, true);

//The function of this 'return' is 'continue'.
return;
}
core.doGlobalRollback(rollbackingSession, true);
} catch (TransactionException ex) {
LOGGER.error("Failed to handle rollbacking [{}] {} {}", rollbackingSession.getXid(), ex.getCode(), ex.getMessage());
}
});
rollbackingSchedule(DEFAULT_SYNC_PROCESSING_DELAY);
}

private void rollbackingSchedule(long delay) {
syncProcessing.schedule(
() -> SessionHolder.distributedLockAndExecute(ROLLBACKING, this::handleRollbackingByScheduled),
delay, TimeUnit.MILLISECONDS);
}

/**
* Handle committing by scheduled.
*/
protected void handleCommittingByScheduled() {
SessionCondition sessionCondition = new SessionCondition(committingStatuses);
sessionCondition.setLazyLoadBranch(true);
List<GlobalSession> committingSessions =
SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition);
if (CollectionUtils.isEmpty(committingSessions)) {
committingSchedule(DEFAULT_SYNC_PROCESSING_DELAY);
return;
}
committingSessions.sort(Comparator.comparingLong(GlobalSession::getBeginTime));
//The first one is the oldest one.
GlobalSession globalSession = committingSessions.get(0);
if (!globalSession.isDeadSession()) {
committingSchedule(System.currentTimeMillis() - globalSession.getBeginTime());
return;
}
long now = System.currentTimeMillis();
SessionHelper.forEach(committingSessions, committingSession -> {
try {
if (isRetryTimeout(now, MAX_COMMIT_RETRY_TIMEOUT, committingSession.getBeginTime())) {

// commit retry timeout event
SessionHelper.endCommitFailed(committingSession, true, true);

//The function of this 'return' is 'continue'.
return;
}
core.doGlobalCommit(committingSession, true);
} catch (TransactionException ex) {
LOGGER.error("Failed to handle committing [{}] {} {}", committingSession.getXid(), ex.getCode(), ex.getMessage());
}
});
committingSchedule(DEFAULT_SYNC_PROCESSING_DELAY);
}

private void committingSchedule(long delay) {
syncProcessing.schedule(
() -> SessionHolder.distributedLockAndExecute(COMMITTING, this::handleCommittingByScheduled),
delay, TimeUnit.MILLISECONDS);
}

/**
* Init.
*/
Expand All @@ -511,6 +611,10 @@ public void init() {
undoLogDelete.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(UNDOLOG_DELETE, this::undoLogDelete),
UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);

rollbackingSchedule(DEFAULT_SYNC_PROCESSING_DELAY);

committingSchedule(DEFAULT_SYNC_PROCESSING_DELAY);
}

@Override
Expand Down
Loading