Skip to content

fix: Add ingest throttle for inner batch transactions #19103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,14 @@ public TransactionInfo runAllChecks(
}

// 4. Check throttles
assertThrottlingPreconditions(txInfo, configuration);
final var hederaConfig = configuration.getConfigData(HederaConfig.class);
if (hederaConfig.ingestThrottleEnabled() && synchronizedThrottleAccumulator.shouldThrottle(txInfo, state)) {
workflowMetrics.incrementThrottled(functionality);
throw new PreCheckException(BUSY);
checkThrottles(state, configuration, txInfo);
// If the transaction is a batch transaction, we need to check the throttling for each inner transaction
if (functionality == HederaFunctionality.ATOMIC_BATCH) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Suppose the batch has three inner transactions A, B, C. Assume that checkThrottles() passes for A and B, but fails for C.

Then this implementation will not "refund" the capacity used for A and B, even though those transactions are never actually handled.

This lets an attacker consume ingest throttle capacity without paying fees at consensus for that usage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I did miss that. Can you re-review the fix.

for (Bytes innerTxnBytes : requireNonNull(txBody.atomicBatch()).transactions()) {
final var innerTxn =
transactionChecker.parseSignedAndCheck(innerTxnBytes, maxIngestParseSize(configuration));
checkThrottles(state, configuration, innerTxn);
}
}

// 4a. Run pure checks
Expand Down Expand Up @@ -266,6 +269,17 @@ public TransactionInfo runAllChecks(
return txInfo;
}

private void checkThrottles(
@NonNull final State state, @NonNull final Configuration configuration, final TransactionInfo txn)
throws PreCheckException {
assertThrottlingPreconditions(txn, configuration);
final var hederaConfig = configuration.getConfigData(HederaConfig.class);
if (hederaConfig.ingestThrottleEnabled() && synchronizedThrottleAccumulator.shouldThrottle(txn, state)) {
workflowMetrics.incrementThrottled(txn.functionality());
throw new PreCheckException(BUSY);
}
}

private static int maxIngestParseSize(Configuration configuration) {
final var jumboTxnEnabled =
configuration.getConfigData(JumboTransactionsConfig.class).isEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void pureChecks(@NonNull final PureChecksContext context) throws PreCheck
// throw if more than one tx has the same transactionID
validateTruePreCheck(txIds.add(txBody.transactionID()), BATCH_LIST_CONTAINS_DUPLICATES);

// validate batch key exists on each inner transaction
// validates a batch key exists on each inner transaction
validateTruePreCheck(txBody.hasBatchKey(), MISSING_BATCH_KEY);

if (!txBody.hasNodeAccountID() || !txBody.nodeAccountIDOrThrow().equals(ATOMIC_BATCH_NODE_ACCOUNT_ID)) {
Expand All @@ -127,7 +127,7 @@ public void preHandle(@NonNull final PreHandleContext context) throws PreCheckEx
final var atomicBatchConfig = config.getConfigData(AtomicBatchConfig.class);

final var txns = atomicBatchTransactionBody.transactions();
// not using stream below as throwing exception from middle of functional pipeline is a terrible idea
// not using a stream below as throwing exception in the middle of a functional pipeline is a terrible idea
for (final var txnBytes : txns) {
final var innerTxBody = innerTxnCache.computeIfAbsent(txnBytes);
validateFalsePreCheck(isNotAllowedFunction(innerTxBody, atomicBatchConfig), BATCH_TRANSACTION_IN_BLACKLIST);
Expand Down Expand Up @@ -155,7 +155,7 @@ public void handle(@NonNull final HandleContext context) throws HandleException
// Timebox, and duplication checks are done on dispatch. So, no need to repeat here
final var recordedFeeCharging = new RecordedFeeCharging(appFeeCharging.get());
for (final var txnBytes : txns) {
// Use the unchecked get because if the transaction is correct it should be in the cache by now
// Use the unchecked get because if the transaction is correct, it should be in the cache by now
final TransactionBody innerTxnBody;
innerTxnBody = innerTxnCache.computeIfAbsentUnchecked(txnBytes);
final var payerId = innerTxnBody.transactionIDOrThrow().accountIDOrThrow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@
import static com.hederahashgraph.api.proto.java.ResponseCodeEnum.BATCH_LIST_EMPTY;
import static com.hederahashgraph.api.proto.java.ResponseCodeEnum.BATCH_SIZE_LIMIT_EXCEEDED;
import static com.hederahashgraph.api.proto.java.ResponseCodeEnum.BATCH_TRANSACTION_IN_BLACKLIST;
import static com.hederahashgraph.api.proto.java.ResponseCodeEnum.BUSY;
import static com.hederahashgraph.api.proto.java.ResponseCodeEnum.DUPLICATE_TRANSACTION;
import static com.hederahashgraph.api.proto.java.ResponseCodeEnum.INNER_TRANSACTION_FAILED;
import static com.hederahashgraph.api.proto.java.ResponseCodeEnum.INSUFFICIENT_PAYER_BALANCE;
import static com.hederahashgraph.api.proto.java.ResponseCodeEnum.INVALID_SIGNATURE;
import static com.hederahashgraph.api.proto.java.ResponseCodeEnum.INVALID_TRANSACTION_DURATION;
import static com.hederahashgraph.api.proto.java.ResponseCodeEnum.MAX_CHILD_RECORDS_EXCEEDED;
import static com.hederahashgraph.api.proto.java.ResponseCodeEnum.MISSING_BATCH_KEY;
import static com.hederahashgraph.api.proto.java.ResponseCodeEnum.NOT_SUPPORTED;
import static com.hederahashgraph.api.proto.java.ResponseCodeEnum.TRANSACTION_OVERSIZE;
import static org.junit.jupiter.api.Assertions.assertEquals;

Expand All @@ -71,7 +71,6 @@
import java.time.Instant;
import java.util.List;
import java.util.stream.Stream;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.DynamicTest;
import org.junit.jupiter.api.Nested;
Expand Down Expand Up @@ -400,7 +399,8 @@ public Stream<DynamicTest> contractCallMoreThanTPSLimit() {
contractCall(contract, function, payload)
.payingWith(payer)
.batchKey(batchOperator))
.hasKnownStatus(INNER_TRANSACTION_FAILED)
// Should throttle at ingest
.hasPrecheck(BUSY)
.signedByPayerAnd(batchOperator)
.payingWith(payer));
}
Expand Down Expand Up @@ -481,7 +481,6 @@ public Stream<DynamicTest> exceedsInnerTxnLimit() {

@HapiTest
@DisplayName("Resubmit batch after INSUFFICIENT_PAYER_BALANCE")
@Disabled // Failed log validation: "Non-duplicate {} not cached for either payer or submitting node {}"
// BATCH_53
public Stream<DynamicTest> resubmitAfterInsufficientPayerBalance() {
return hapiTest(
Expand All @@ -492,7 +491,7 @@ public Stream<DynamicTest> resubmitAfterInsufficientPayerBalance() {
// batch will fail due to insufficient balance
atomicBatch(
cryptoCreate("foo").txnId("innerTxn1").batchKey("alice"),
cryptoCreate("foo").txnId("innerTxn1").batchKey("alice"))
cryptoCreate("foo").txnId("innerTxn2").batchKey("alice"))
.txnId("failingBatch")
.payingWith("alice")
.hasPrecheck(INSUFFICIENT_PAYER_BALANCE),
Expand All @@ -502,7 +501,7 @@ public Stream<DynamicTest> resubmitAfterInsufficientPayerBalance() {
// resubmit the batch
atomicBatch(
cryptoCreate("foo").txnId("innerTxn1").batchKey("alice"),
cryptoCreate("foo").txnId("innerTxn1").batchKey("alice"))
cryptoCreate("foo").txnId("innerTxn2").batchKey("alice"))
.txnId("failingBatch")
.payingWith("alice"));
}
Expand Down Expand Up @@ -548,7 +547,6 @@ public Stream<DynamicTest> nonInnerTransactionHasBatchKeyFails() {

@HapiTest
@DisplayName("Submit non batch inner transaction with invalid batch key should fail")
@Disabled // TODO: Enable this test when we have global batch key validation
// BATCH_55
public Stream<DynamicTest> nonInnerTxnWithInvalidBatchKey() {
return hapiTest(withOpContext((spec, opLog) -> {
Expand All @@ -558,8 +556,10 @@ public Stream<DynamicTest> nonInnerTxnWithInvalidBatchKey() {
.build();
// save invalid key in registry
spec.registry().saveKey("invalidKey", invalidKey);
// submit op with invalid batch key
final var op = cryptoCreate("foo").batchKey("invalidKey").hasPrecheck(NOT_SUPPORTED);
// submit op with an invalid batch key
final var op = cryptoCreate("foo")
.batchKey("invalidKey")
.hasKnownStatus(BATCH_KEY_SET_ON_NON_INNER_TRANSACTION);
allRunFor(spec, op);
}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.DynamicTest;
import org.junit.jupiter.api.Nested;
Expand Down Expand Up @@ -231,26 +232,11 @@ public Stream<DynamicTest> contractCallTPSLimit() {
uploadInitCode(contract),
contractCreate(contract),
overridingThrottles("testSystemFiles/artificial-limits.json"),
// create batch with 6 contract calls
atomicBatch(
contractCall(contract, function, payload)
.payingWith(payer)
.batchKey(batchOperator),
contractCall(contract, function, payload)
.payingWith(payer)
.batchKey(batchOperator),
contractCall(contract, function, payload)
.payingWith(payer)
.batchKey(batchOperator),
contractCall(contract, function, payload)
.payingWith(payer)
.batchKey(batchOperator),
contractCall(contract, function, payload)
.payingWith(payer)
.batchKey(batchOperator),
contractCall(contract, function, payload)
.payingWith(payer)
.batchKey(batchOperator))
// create a batch with 1 contract calls (the TPS limit is 3),
// and after the frontend scale we can send only 1 per second
atomicBatch(contractCall(contract, function, payload)
.payingWith(payer)
.batchKey(batchOperator))
.signedByPayerAnd(batchOperator)
.payingWith(payer));
}
Expand Down Expand Up @@ -344,6 +330,7 @@ public Stream<DynamicTest> deletedAccountKeyAsBatchKey() {
.signedBy(payer, aliceKey));
}

@Disabled // No longer relevant, after implementing ingest throttle for the inner transactions
@LeakyHapiTest(requirement = {THROTTLE_OVERRIDES})
@DisplayName("Update throttles should take effect to following inner txns")
// BATCH_08
Expand Down
Loading