Skip to content

Commit ebe16e0

Browse files
fix: Fix schedules during replay (#12585)
Signed-off-by: Neeharika-Sompalli <[email protected]>
1 parent 3f3008e commit ebe16e0

File tree

5 files changed

+115
-45
lines changed

5 files changed

+115
-45
lines changed

hedera-node/hedera-app/src/main/java/com/hedera/node/app/statedumpers/scheduledtransactions/ScheduledTransactionsDumpUtils.java

+33-22
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616

1717
package com.hedera.node.app.statedumpers.scheduledtransactions;
1818

19+
import static com.hedera.node.app.service.mono.pbj.PbjConverter.fromPbj;
1920
import static com.hedera.node.app.service.mono.statedumpers.associations.BBMTokenAssociation.entityIdFrom;
2021
import static com.hedera.node.app.service.mono.statedumpers.scheduledtransactions.ScheduledTransactionsDumpUtils.reportOnScheduledTransactionsByEquality;
2122
import static com.hedera.node.app.service.mono.statedumpers.scheduledtransactions.ScheduledTransactionsDumpUtils.reportOnScheduledTransactionsByExpiry;
2223
import static com.hedera.node.app.service.mono.statedumpers.scheduledtransactions.ScheduledTransactionsDumpUtils.reportOnScheduledTransactionsById;
24+
import static com.hedera.node.app.service.schedule.impl.handlers.HandlerUtility.childAsOrdinary;
2325
import static com.swirlds.common.threading.manager.AdHocThreadManager.getStaticThreadManager;
2426

2527
import com.hedera.hapi.node.base.ScheduleID;
@@ -28,7 +30,6 @@
2830
import com.hedera.hapi.node.state.schedule.Schedule;
2931
import com.hedera.hapi.node.state.schedule.ScheduleList;
3032
import com.hedera.node.app.service.mono.legacy.core.jproto.JKey;
31-
import com.hedera.node.app.service.mono.pbj.PbjConverter;
3233
import com.hedera.node.app.service.mono.state.adapters.VirtualMapLike;
3334
import com.hedera.node.app.service.mono.state.submerkle.RichInstant;
3435
import com.hedera.node.app.service.mono.statedumpers.DumpCheckpoint;
@@ -52,7 +53,6 @@
5253
import java.util.Optional;
5354
import java.util.TreeMap;
5455
import java.util.concurrent.ConcurrentLinkedQueue;
55-
import java.util.concurrent.CopyOnWriteArrayList;
5656

5757
public class ScheduledTransactionsDumpUtils {
5858

@@ -63,38 +63,52 @@ public static void dumpModScheduledTransactions(
6363
@NonNull final VirtualMap<OnDiskKey<ProtoLong>, OnDiskValue<ScheduleList>> byExpiry,
6464
@NonNull final DumpCheckpoint checkpoint) {
6565
try (@NonNull final var writer = new Writer(path)) {
66-
final var dumpableScheduledTransactionsById = gatherModScheduledTransactionsById(scheduledTransactions);
67-
reportOnScheduledTransactionsById(writer, dumpableScheduledTransactionsById);
68-
System.out.printf(
69-
"=== mod scheduled transactions report is %d bytes at checkpoint %s%n",
70-
writer.getSize(), checkpoint.name());
71-
final var byEqualityDump = gatherModScheduledTransactionsByEquality(byEquality);
72-
reportOnScheduledTransactionsByEquality(writer, byEqualityDump);
66+
System.out.printf("=== Dumping schedule transactions %n ======");
67+
68+
final var byId = gatherModScheduledTransactionsById(scheduledTransactions);
69+
reportOnScheduledTransactionsById(writer, byId);
70+
System.out.println(
71+
"Size of byId in State : " + scheduledTransactions.size() + " and gathered : " + byId.size());
72+
7373
// Not sure how to compare Equality Virtual map in mono and mod
7474
final var byExpiryDump = gatherModScheduledTransactionsByExpiry(byExpiry);
7575
reportOnScheduledTransactionsByExpiry(writer, byExpiryDump);
76-
System.out.printf(
77-
"=== mod scheduled transactions by expiry report is %d bytes at checkpoint %s%n",
78-
writer.getSize(), checkpoint.name());
76+
System.out.println(
77+
"Size of byExpiry in State : " + byExpiry.size() + " and gathered : " + byExpiryDump.size());
78+
79+
try {
80+
final var byEqualityDump = gatherModScheduledTransactionsByEquality(byEquality);
81+
reportOnScheduledTransactionsByEquality(writer, byEqualityDump);
82+
System.out.println("Size of byEquality in State : " + byEquality.size() + " and gathered : "
83+
+ byEqualityDump.size());
84+
} catch (Exception e) {
85+
e.printStackTrace();
86+
System.out.println("Error in gathering byEqualityDump");
87+
}
7988
}
8089
}
8190

8291
private static List<BBMScheduledEqualityValue> gatherModScheduledTransactionsByEquality(
8392
final VirtualMap<OnDiskKey<ProtoBytes>, OnDiskValue<ScheduleList>> source) {
8493
final List<BBMScheduledEqualityValue> r = new ArrayList<>();
85-
final var scheduledTransactions = new CopyOnWriteArrayList<BBMScheduledEqualityValue>();
94+
final var scheduledTransactions = new ConcurrentLinkedQueue<BBMScheduledEqualityValue>();
8695

8796
try {
8897
VirtualMapLike.from(source)
8998
.extractVirtualMapDataC(
9099
getStaticThreadManager(),
91100
p -> scheduledTransactions.add(
92101
fromMod(p.key().getKey(), p.value().getValue())),
93-
8);
102+
1);
94103
} catch (final InterruptedException ex) {
95104
System.err.println("*** Traversal of scheduledTransactions by equality virtual map interrupted!");
96105
Thread.currentThread().interrupt();
97106
}
107+
108+
while (!scheduledTransactions.isEmpty()) {
109+
final var mapping = scheduledTransactions.poll();
110+
r.add(mapping);
111+
}
98112
return r;
99113
}
100114

@@ -132,11 +146,8 @@ private static Map<BBMScheduledId, BBMScheduledTransaction> gatherModScheduledTr
132146
VirtualMapLike.from(source)
133147
.extractVirtualMapDataC(
134148
getStaticThreadManager(),
135-
p -> {
136-
scheduledTransactions.add(Pair.of(
137-
fromMod(p.key().getKey()),
138-
fromMod(p.value().getValue())));
139-
},
149+
p -> scheduledTransactions.add(Pair.of(
150+
fromMod(p.key().getKey()), fromMod(p.value().getValue()))),
140151
8);
141152
} catch (final InterruptedException ex) {
142153
System.err.println("*** Traversal of scheduledTransactions virtual map interrupted!");
@@ -194,9 +205,9 @@ static BBMScheduledTransaction fromMod(@NonNull final Schedule value) {
194205
RichInstant.fromJava(Instant.ofEpochSecond(value.calculatedExpirationSecond())),
195206
RichInstant.fromJava(Instant.ofEpochSecond(
196207
value.resolutionTime().seconds(), value.resolutionTime().nanos())),
197-
PbjConverter.fromPbj(value.originalCreateTransaction()).toByteArray(),
198-
PbjConverter.fromPbj(value.originalCreateTransaction()),
199-
PbjConverter.fromPbj(value.scheduledTransaction()),
208+
fromPbj(value.originalCreateTransaction()).toByteArray(),
209+
fromPbj(childAsOrdinary(value)),
210+
fromPbj(value.scheduledTransaction()),
200211
value.signatories().stream().map(k -> toPrimitiveKey(k)).toList());
201212
}
202213

hedera-node/hedera-mono-service/src/main/java/com/hedera/node/app/service/mono/statedumpers/scheduledtransactions/ScheduledTransactionsDumpUtils.java

+9-5
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,22 @@ public static void dumpMonoScheduledTransactions(
4949
final var byId = scheduledTransactions.byId();
5050
final var byEquality = scheduledTransactions.byEquality();
5151
final var byExpirationSecond = scheduledTransactions.byExpirationSecond();
52-
System.out.printf("=== Dumping schedule transactions %s%n ======");
52+
53+
System.out.printf("=== Dumping schedule transactions %n ======");
54+
5355
final var byIdDump = gatherMonoScheduledTransactionsByID(byId);
5456
reportOnScheduledTransactionsById(writer, byIdDump);
57+
System.out.println("Size of byId in State : " + byId.size() + " and gathered : " + byIdDump.size());
5558

5659
final var byEqualityDump = gatherMonoScheduledTransactionsByEquality(byEquality);
5760
reportOnScheduledTransactionsByEquality(writer, byEqualityDump);
58-
// Not sure how to compare Equality Virtual map in mono and mod
61+
System.out.println(
62+
"Size of byEquality in State : " + byEquality.size() + " and gathered : " + byEqualityDump.size());
63+
5964
final var byExpiryDump = gatherMonoScheduledTransactionsByExpiry(byExpirationSecond);
6065
reportOnScheduledTransactionsByExpiry(writer, byExpiryDump);
61-
System.out.printf(
62-
"=== mono scheduled transactions report by expiry is %d bytes at checkpoint %s%n",
63-
writer.getSize(), checkpoint.name());
66+
System.out.println("Size of byExpiry in State : " + byExpirationSecond.size() + " and gathered : "
67+
+ byExpiryDump.size());
6468
}
6569
}
6670

hedera-node/hedera-schedule-service-impl/src/main/java/com/hedera/node/app/service/schedule/impl/ScheduleStoreUtility.java

+30
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,16 @@
1919
import com.google.common.hash.Hasher;
2020
import com.google.common.hash.Hashing;
2121
import com.hedera.hapi.node.base.Key;
22+
import com.hedera.hapi.node.base.ScheduleID;
2223
import com.hedera.hapi.node.scheduled.SchedulableTransactionBody;
2324
import com.hedera.hapi.node.state.schedule.Schedule;
25+
import com.hedera.hapi.node.state.schedule.ScheduleList;
2426
import com.hedera.pbj.runtime.io.buffer.Bytes;
2527
import edu.umd.cs.findbugs.annotations.NonNull;
28+
import edu.umd.cs.findbugs.annotations.Nullable;
2629
import java.nio.charset.StandardCharsets;
30+
import java.util.ArrayList;
31+
import java.util.Collections;
2732
import java.util.Objects;
2833

2934
public final class ScheduleStoreUtility {
@@ -66,4 +71,29 @@ private static void addToHash(final Hasher hasher, final SchedulableTransactionB
6671
hasher.putInt(bytes.length);
6772
hasher.putBytes(bytes);
6873
}
74+
75+
private static boolean isScheduleInList(final ScheduleID scheduleId, final ScheduleList scheduleList) {
76+
return scheduleList.schedulesOrElse(Collections.emptyList()).stream()
77+
.anyMatch(s -> s.scheduleIdOrThrow().equals(scheduleId));
78+
}
79+
80+
static ScheduleList addOrReplace(final Schedule schedule, @Nullable final ScheduleList scheduleList) {
81+
if (scheduleList == null) {
82+
return new ScheduleList(Collections.singletonList(schedule));
83+
}
84+
final var newScheduleList = scheduleList.copyBuilder();
85+
final var scheduleId = schedule.scheduleIdOrThrow();
86+
final var schedules = new ArrayList<>(scheduleList.schedulesOrElse(Collections.emptyList()));
87+
if (!isScheduleInList(scheduleId, scheduleList)) {
88+
schedules.add(schedule);
89+
} else {
90+
for (int i = 0; i < schedules.size(); i++) {
91+
final var existingSchedule = schedules.get(i);
92+
if (existingSchedule.scheduleIdOrThrow().equals(scheduleId)) {
93+
schedules.set(i, schedule);
94+
}
95+
}
96+
}
97+
return newScheduleList.schedules(schedules).build();
98+
}
6999
}

hedera-node/hedera-schedule-service-impl/src/main/java/com/hedera/node/app/service/schedule/impl/WritableScheduleStoreImpl.java

+7-18
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package com.hedera.node.app.service.schedule.impl;
1818

19-
import static java.util.Collections.emptyList;
19+
import static com.hedera.node.app.service.schedule.impl.ScheduleStoreUtility.addOrReplace;
2020

2121
import com.hedera.hapi.node.base.ScheduleID;
2222
import com.hedera.hapi.node.base.Timestamp;
@@ -33,9 +33,6 @@
3333
import edu.umd.cs.findbugs.annotations.NonNull;
3434
import edu.umd.cs.findbugs.annotations.Nullable;
3535
import java.time.Instant;
36-
import java.util.ArrayList;
37-
import java.util.LinkedList;
38-
import java.util.List;
3936
import java.util.Objects;
4037
import org.apache.logging.log4j.LogManager;
4138
import org.apache.logging.log4j.Logger;
@@ -119,26 +116,18 @@ public Schedule getForModify(@Nullable final ScheduleID idToFind) {
119116
@Override
120117
public void put(@NonNull final Schedule scheduleToAdd) {
121118
schedulesByIdMutable.put(scheduleToAdd.scheduleIdOrThrow(), scheduleToAdd);
119+
122120
final ProtoBytes newHash = new ProtoBytes(ScheduleStoreUtility.calculateBytesHash(scheduleToAdd));
123121
final ScheduleList inStateEquality = schedulesByEqualityMutable.get(newHash);
124-
List<Schedule> byEquality =
125-
inStateEquality != null ? new LinkedList<>(inStateEquality.schedulesOrElse(emptyList())) : null;
126-
if (byEquality == null) {
127-
byEquality = new LinkedList<>();
128-
}
129-
byEquality.add(scheduleToAdd);
130-
schedulesByEqualityMutable.put(newHash, new ScheduleList(byEquality));
122+
final var newEqualityScheduleList = addOrReplace(scheduleToAdd, inStateEquality);
123+
schedulesByEqualityMutable.put(newHash, newEqualityScheduleList);
124+
131125
// calculated expiration time is never null...
132126
final ProtoLong expirationSecond = new ProtoLong(scheduleToAdd.calculatedExpirationSecond());
133127
final ScheduleList inStateExpiration = schedulesByExpirationMutable.get(expirationSecond);
134128
// we should not be modifying the schedules list directly. This could cause ISS
135-
List<Schedule> byExpiration = inStateExpiration != null ? new ArrayList<>(inStateExpiration.schedules()) : null;
136-
if (byExpiration == null) {
137-
byExpiration = new LinkedList<>();
138-
}
139-
byExpiration.add(scheduleToAdd);
140-
final var newScheduleList = new ScheduleList(byExpiration);
141-
schedulesByExpirationMutable.put(expirationSecond, newScheduleList);
129+
final var newExpiryScheduleList = addOrReplace(scheduleToAdd, inStateExpiration);
130+
schedulesByExpirationMutable.put(expirationSecond, newExpiryScheduleList);
142131
}
143132

144133
@NonNull

hedera-node/hedera-schedule-service-impl/src/test/java/com/hedera/node/app/service/schedule/impl/WritableScheduleStoreImplTest.java

+36
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import com.hedera.hapi.node.base.Key;
2222
import com.hedera.hapi.node.base.ScheduleID;
2323
import com.hedera.hapi.node.base.Timestamp;
24+
import com.hedera.hapi.node.state.primitives.ProtoBytes;
25+
import com.hedera.hapi.node.state.primitives.ProtoLong;
2426
import com.hedera.hapi.node.state.schedule.Schedule;
2527
import com.hedera.node.app.spi.workflows.PreCheckException;
2628
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -69,6 +71,40 @@ void verifyPutModifiesState() {
6971
assertThat(actual.signatories()).containsExactlyInAnyOrderElementsOf(modifiedSignatories);
7072
}
7173

74+
@Test
75+
void verifyPutDoesDedupliction() {
76+
final ScheduleID idToDelete = scheduleInState.scheduleId();
77+
Schedule actual = writableById.getForModify(idToDelete);
78+
assertThat(actual).isNotNull();
79+
assertThat(actual.signatories()).containsExactlyInAnyOrderElementsOf(scheduleInState.signatories());
80+
final Set<Key> modifiedSignatories = Set.of(schedulerKey, payerKey);
81+
final Schedule modified = replaceSignatoriesAndMarkExecuted(actual, modifiedSignatories, testConsensusTime);
82+
final var hash = new ProtoBytes(ScheduleStoreUtility.calculateBytesHash(actual));
83+
84+
final var equalityList = writableByEquality.get(hash);
85+
assertThat(equalityList.schedules().size()).isEqualTo(1);
86+
87+
final var expiryList = writableByExpiration.get(new ProtoLong(actual.calculatedExpirationSecond()));
88+
assertThat(expiryList.schedules().size()).isEqualTo(1);
89+
90+
writableSchedules.put(modified);
91+
writableSchedules.put(modified);
92+
writableSchedules.put(modified);
93+
94+
actual = scheduleStore.get(idToDelete);
95+
assertThat(actual).isNotNull();
96+
assertThat(actual.executed()).isTrue();
97+
assertThat(actual.resolutionTime()).isNotNull().isEqualTo(asTimestamp(testConsensusTime));
98+
assertThat(actual.signatories()).containsExactlyInAnyOrderElementsOf(modifiedSignatories);
99+
100+
// size doesn't increase when same element is put multiple times
101+
final var equalityListAfter = writableByEquality.get(hash);
102+
assertThat(equalityListAfter.schedules().size()).isEqualTo(1);
103+
104+
final var expiryListAfter = writableByExpiration.get(new ProtoLong(actual.calculatedExpirationSecond()));
105+
assertThat(expiryListAfter.schedules().size()).isEqualTo(1);
106+
}
107+
72108
@Test
73109
void purgesExpiredSchedules() {
74110
final ScheduleID idToDelete = scheduleInState.scheduleId();

0 commit comments

Comments
 (0)