Skip to content

Commit 703c69d

Browse files
authored
feat: construct schedulers automatically (#12504)
Signed-off-by: Cody Littley <[email protected]>
1 parent 4a8477e commit 703c69d

File tree

6 files changed

+86
-64
lines changed

6 files changed

+86
-64
lines changed

platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/component/ComponentWiring.java

+53
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,15 @@
1616

1717
package com.swirlds.common.wiring.component;
1818

19+
import static com.swirlds.common.wiring.model.HyperlinkBuilder.platformCoreHyperlink;
20+
1921
import com.swirlds.common.wiring.component.internal.FilterToBind;
2022
import com.swirlds.common.wiring.component.internal.InputWireToBind;
2123
import com.swirlds.common.wiring.component.internal.TransformerToBind;
2224
import com.swirlds.common.wiring.component.internal.WiringComponentProxy;
2325
import com.swirlds.common.wiring.model.WiringModel;
2426
import com.swirlds.common.wiring.schedulers.TaskScheduler;
27+
import com.swirlds.common.wiring.schedulers.builders.TaskSchedulerConfiguration;
2528
import com.swirlds.common.wiring.transformers.WireFilter;
2629
import com.swirlds.common.wiring.transformers.WireTransformer;
2730
import com.swirlds.common.wiring.wires.input.BindableInputWire;
@@ -97,7 +100,10 @@ public class ComponentWiring<COMPONENT_TYPE, OUTPUT_TYPE> {
97100
* @param model the wiring model that will contain the component
98101
* @param clazz the interface class of the component
99102
* @param scheduler the task scheduler that will run the component
103+
* @deprecated use {@link #ComponentWiring(WiringModel, Class, TaskSchedulerConfiguration)} instead. Once all uses
104+
* have been updated, this constructor will be removed.
100105
*/
106+
@Deprecated
101107
public ComponentWiring(
102108
@NonNull final WiringModel model,
103109
@NonNull final Class<COMPONENT_TYPE> clazz,
@@ -113,6 +119,43 @@ public ComponentWiring(
113119
proxyComponent = (COMPONENT_TYPE) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[] {clazz}, proxy);
114120
}
115121

122+
/**
123+
* Create a new component wiring.
124+
*
125+
* @param model the wiring model that will contain the component
126+
* @param clazz the interface class of the component
127+
* @param schedulerConfiguration for the task scheduler that will run the component
128+
*/
129+
public ComponentWiring(
130+
@NonNull final WiringModel model,
131+
@NonNull final Class<COMPONENT_TYPE> clazz,
132+
@NonNull final TaskSchedulerConfiguration schedulerConfiguration) {
133+
134+
this.model = Objects.requireNonNull(model);
135+
Objects.requireNonNull(schedulerConfiguration);
136+
137+
final String schedulerName;
138+
final SchedulerLabel schedulerLabelAnnotation = clazz.getAnnotation(SchedulerLabel.class);
139+
if (schedulerLabelAnnotation == null) {
140+
schedulerName = clazz.getSimpleName();
141+
} else {
142+
schedulerName = schedulerLabelAnnotation.value();
143+
}
144+
145+
this.scheduler = model.schedulerBuilder(schedulerName)
146+
.configure(schedulerConfiguration)
147+
// FUTURE WORK: all components not currently in platform core should move there
148+
.withHyperlink(platformCoreHyperlink(clazz))
149+
.build()
150+
.cast();
151+
152+
if (!clazz.isInterface()) {
153+
throw new IllegalArgumentException("Component class " + clazz.getName() + " is not an interface.");
154+
}
155+
156+
proxyComponent = (COMPONENT_TYPE) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[] {clazz}, proxy);
157+
}
158+
116159
/**
117160
* Get the output wire of this component.
118161
*
@@ -566,4 +609,14 @@ public void bind(@NonNull final COMPONENT_TYPE component) {
566609
filterToBind.filter().bind(x -> filterToBind.predicate().apply(component, x));
567610
}
568611
}
612+
613+
/**
614+
* Get the name of the scheduler that is running this component.
615+
*
616+
* @return the name of the scheduler
617+
*/
618+
@NonNull
619+
public String getSchedulerName() {
620+
return scheduler.getName();
621+
}
569622
}

platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/component/SchedulerLabel.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,12 @@
2323
import java.lang.annotation.Target;
2424

2525
/**
26-
* Annotates a method parameter used to implement a transformer/filter. Use this to override the name of the task
27-
* scheduler used to operate the transformer/filter.
26+
* Annotates a name that should be used as an override for the task scheduler's name instead of a component's interface
27+
* name. Can also be used to annotate a method parameter used to implement a transformer/filter (these get turned into
28+
* direct schedulers, which need to be named).
2829
*/
2930
@Retention(RetentionPolicy.RUNTIME)
30-
@Target(ElementType.METHOD)
31+
@Target({ElementType.TYPE, ElementType.METHOD})
3132
public @interface SchedulerLabel {
3233

3334
/**

platform-sdk/swirlds-common/src/test/java/com/swirlds/common/wiring/component/ComponentWiringTests.java

+23-47
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@
2323
import com.swirlds.common.context.PlatformContext;
2424
import com.swirlds.common.test.fixtures.platform.TestPlatformContextBuilder;
2525
import com.swirlds.common.wiring.model.WiringModel;
26-
import com.swirlds.common.wiring.schedulers.TaskScheduler;
27-
import com.swirlds.common.wiring.schedulers.builders.TaskSchedulerType;
26+
import com.swirlds.common.wiring.schedulers.builders.TaskSchedulerConfiguration;
2827
import com.swirlds.common.wiring.wires.input.InputWire;
2928
import com.swirlds.common.wiring.wires.output.OutputWire;
3029
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -109,6 +108,7 @@ public long getRunningValue() {
109108
}
110109
}
111110

111+
@SchedulerLabel("actuallyCallThisSomethingDifferent")
112112
private interface ComponentWithListOutput {
113113
@NonNull
114114
List<String> handleInputA(@NonNull String s);
@@ -154,14 +154,11 @@ void methodNotOnComponentTest() {
154154

155155
final WiringModel wiringModel = WiringModel.create(platformContext, ForkJoinPool.commonPool());
156156

157-
final TaskScheduler<Long> scheduler = wiringModel
158-
.schedulerBuilder("test")
159-
.withType(TaskSchedulerType.DIRECT)
160-
.build()
161-
.cast();
157+
final TaskSchedulerConfiguration schedulerConfiguration = TaskSchedulerConfiguration.parse("DIRECT");
162158

163159
final ComponentWiring<FooBarBaz, Long> fooBarBazWiring =
164-
new ComponentWiring<>(wiringModel, FooBarBaz.class, scheduler);
160+
new ComponentWiring<>(wiringModel, FooBarBaz.class, schedulerConfiguration);
161+
assertEquals("FooBarBaz", fooBarBazWiring.getSchedulerName());
165162

166163
assertThrows(IllegalArgumentException.class, () -> fooBarBazWiring.getInputWire((x, y) -> 0L));
167164
assertThrows(IllegalArgumentException.class, () -> fooBarBazWiring.getInputWire((x, y) -> {}));
@@ -178,14 +175,11 @@ void simpleComponentTest(final int bindLocation) {
178175

179176
final WiringModel wiringModel = WiringModel.create(platformContext, ForkJoinPool.commonPool());
180177

181-
final TaskScheduler<Long> scheduler = wiringModel
182-
.schedulerBuilder("test")
183-
.withType(TaskSchedulerType.DIRECT)
184-
.build()
185-
.cast();
178+
final TaskSchedulerConfiguration schedulerConfiguration = TaskSchedulerConfiguration.parse("DIRECT");
186179

187180
final ComponentWiring<FooBarBaz, Long> fooBarBazWiring =
188-
new ComponentWiring<>(wiringModel, FooBarBaz.class, scheduler);
181+
new ComponentWiring<>(wiringModel, FooBarBaz.class, schedulerConfiguration);
182+
assertEquals("FooBarBaz", fooBarBazWiring.getSchedulerName());
189183

190184
final FooBarBazImpl fooBarBazImpl = new FooBarBazImpl();
191185

@@ -270,16 +264,13 @@ void transformerTest(final int bindLocation) {
270264

271265
final WiringModel wiringModel = WiringModel.create(platformContext, ForkJoinPool.commonPool());
272266

273-
final TaskScheduler<Long> scheduler = wiringModel
274-
.schedulerBuilder("test")
275-
.withType(TaskSchedulerType.DIRECT)
276-
.build()
277-
.cast();
267+
final TaskSchedulerConfiguration schedulerConfiguration = TaskSchedulerConfiguration.parse("DIRECT");
278268

279269
final FooBarBazImpl fooBarBazImpl = new FooBarBazImpl();
280270

281271
final ComponentWiring<FooBarBaz, Long> fooBarBazWiring =
282-
new ComponentWiring<>(wiringModel, FooBarBaz.class, scheduler);
272+
new ComponentWiring<>(wiringModel, FooBarBaz.class, schedulerConfiguration);
273+
assertEquals("FooBarBaz", fooBarBazWiring.getSchedulerName());
283274

284275
if (bindLocation == 0) {
285276
fooBarBazWiring.bind(fooBarBazImpl);
@@ -342,16 +333,13 @@ void filterTest(final int bindLocation) {
342333

343334
final WiringModel wiringModel = WiringModel.create(platformContext, ForkJoinPool.commonPool());
344335

345-
final TaskScheduler<Long> scheduler = wiringModel
346-
.schedulerBuilder("test")
347-
.withType(TaskSchedulerType.DIRECT)
348-
.build()
349-
.cast();
336+
final TaskSchedulerConfiguration schedulerConfiguration = TaskSchedulerConfiguration.parse("DIRECT");
350337

351338
final FooBarBazImpl fooBarBazImpl = new FooBarBazImpl();
352339

353340
final ComponentWiring<FooBarBaz, Long> fooBarBazWiring =
354-
new ComponentWiring<>(wiringModel, FooBarBaz.class, scheduler);
341+
new ComponentWiring<>(wiringModel, FooBarBaz.class, schedulerConfiguration);
342+
assertEquals("FooBarBaz", fooBarBazWiring.getSchedulerName());
355343

356344
if (bindLocation == 0) {
357345
fooBarBazWiring.bind(fooBarBazImpl);
@@ -416,14 +404,11 @@ void splitterTest(final int bindLocation) {
416404

417405
final WiringModel wiringModel = WiringModel.create(platformContext, ForkJoinPool.commonPool());
418406

419-
final TaskScheduler<List<String>> scheduler = wiringModel
420-
.schedulerBuilder("test")
421-
.withType(TaskSchedulerType.DIRECT)
422-
.build()
423-
.cast();
407+
final TaskSchedulerConfiguration schedulerConfiguration = TaskSchedulerConfiguration.parse("DIRECT");
424408

425409
final ComponentWiring<ComponentWithListOutput, List<String>> componentWiring =
426-
new ComponentWiring<>(wiringModel, ComponentWithListOutput.class, scheduler);
410+
new ComponentWiring<>(wiringModel, ComponentWithListOutput.class, schedulerConfiguration);
411+
assertEquals("actuallyCallThisSomethingDifferent", componentWiring.getSchedulerName());
427412

428413
if (bindLocation == 0) {
429414
componentWiring.bind(new ComponentWithListOutputImpl());
@@ -458,14 +443,11 @@ void filteredSplitterTest(final int bindLocation) {
458443

459444
final WiringModel wiringModel = WiringModel.create(platformContext, ForkJoinPool.commonPool());
460445

461-
final TaskScheduler<List<String>> scheduler = wiringModel
462-
.schedulerBuilder("test")
463-
.withType(TaskSchedulerType.DIRECT)
464-
.build()
465-
.cast();
446+
final TaskSchedulerConfiguration schedulerConfiguration = TaskSchedulerConfiguration.parse("DIRECT");
466447

467448
final ComponentWiring<ComponentWithListOutput, List<String>> componentWiring =
468-
new ComponentWiring<>(wiringModel, ComponentWithListOutput.class, scheduler);
449+
new ComponentWiring<>(wiringModel, ComponentWithListOutput.class, schedulerConfiguration);
450+
assertEquals("actuallyCallThisSomethingDifferent", componentWiring.getSchedulerName());
469451

470452
if (bindLocation == 0) {
471453
componentWiring.bind(new ComponentWithListOutputImpl());
@@ -509,17 +491,11 @@ void transformedSplitterTest(final int bindLocation) {
509491

510492
final WiringModel wiringModel = WiringModel.create(platformContext, ForkJoinPool.commonPool());
511493

512-
final TaskScheduler<List<String>> scheduler = wiringModel
513-
.schedulerBuilder("test")
514-
.withType(TaskSchedulerType.DIRECT)
515-
.withUncaughtExceptionHandler((t, e) -> {
516-
e.printStackTrace();
517-
})
518-
.build()
519-
.cast();
494+
final TaskSchedulerConfiguration schedulerConfiguration = TaskSchedulerConfiguration.parse("DIRECT");
520495

521496
final ComponentWiring<ComponentWithListOutput, List<String>> componentWiring =
522-
new ComponentWiring<>(wiringModel, ComponentWithListOutput.class, scheduler);
497+
new ComponentWiring<>(wiringModel, ComponentWithListOutput.class, schedulerConfiguration);
498+
assertEquals("actuallyCallThisSomethingDifferent", componentWiring.getSchedulerName());
523499

524500
if (bindLocation == 0) {
525501
componentWiring.bind(new ComponentWithListOutputImpl());

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/PlatformSchedulers.java

-8
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import com.swirlds.platform.event.preconsensus.PcesWriter;
4141
import com.swirlds.platform.event.stream.EventStreamManager;
4242
import com.swirlds.platform.event.validation.EventSignatureValidator;
43-
import com.swirlds.platform.event.validation.InternalEventValidator;
4443
import com.swirlds.platform.eventhandling.ConsensusRoundHandler;
4544
import com.swirlds.platform.eventhandling.TransactionPrehandler;
4645
import com.swirlds.platform.gossip.shadowgraph.Shadowgraph;
@@ -65,7 +64,6 @@
6564
*
6665
* @param eventHasherScheduler the scheduler for the event hasher
6766
* @param postHashCollectorScheduler the scheduler for the post hash collector
68-
* @param internalEventValidatorScheduler the scheduler for the internal event validator
6967
* @param eventDeduplicatorScheduler the scheduler for the event deduplicator
7068
* @param eventSignatureValidatorScheduler the scheduler for the event signature validator
7169
* @param orphanBufferScheduler the scheduler for the orphan buffer
@@ -92,7 +90,6 @@
9290
public record PlatformSchedulers(
9391
@NonNull TaskScheduler<GossipEvent> eventHasherScheduler,
9492
@NonNull TaskScheduler<GossipEvent> postHashCollectorScheduler,
95-
@NonNull TaskScheduler<GossipEvent> internalEventValidatorScheduler,
9693
@NonNull TaskScheduler<GossipEvent> eventDeduplicatorScheduler,
9794
@NonNull TaskScheduler<GossipEvent> eventSignatureValidatorScheduler,
9895
@NonNull TaskScheduler<List<GossipEvent>> orphanBufferScheduler,
@@ -151,11 +148,6 @@ public static PlatformSchedulers create(
151148
.withUnhandledTaskCapacity(UNLIMITED_CAPACITY)
152149
.build()
153150
.cast(),
154-
model.schedulerBuilder("internalEventValidator")
155-
.configure(config.internalEventValidator())
156-
.withHyperlink(platformCoreHyperlink(InternalEventValidator.class))
157-
.build()
158-
.cast(),
159151
model.schedulerBuilder("eventDeduplicator")
160152
.withType(config.eventDeduplicatorSchedulerType())
161153
.withUnhandledTaskCapacity(config.eventDeduplicatorUnhandledCapacity())

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/PlatformWiring.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -173,12 +173,12 @@ public PlatformWiring(
173173

174174
this.platformContext = Objects.requireNonNull(platformContext);
175175

176-
final PlatformSchedulersConfig schedulersConfig =
176+
final PlatformSchedulersConfig config =
177177
platformContext.getConfiguration().getConfigData(PlatformSchedulersConfig.class);
178178

179179
final int coreCount = Runtime.getRuntime().availableProcessors();
180-
final int parallelism = (int) Math.max(
181-
1, schedulersConfig.defaultPoolMultiplier() * coreCount + schedulersConfig.defaultPoolConstant());
180+
final int parallelism =
181+
(int) Math.max(1, config.defaultPoolMultiplier() * coreCount + config.defaultPoolConstant());
182182
final ForkJoinPool defaultPool = new ForkJoinPool(parallelism);
183183
logger.info(STARTUP.getMarker(), "Default platform pool parallelism: {}", parallelism);
184184

@@ -216,8 +216,8 @@ public PlatformWiring(
216216
eventHasherWiring = new ComponentWiring<>(model, EventHasher.class, schedulers.eventHasherScheduler());
217217
postHashCollectorWiring =
218218
new PassThroughWiring<>(model, "GossipEvent", schedulers.postHashCollectorScheduler());
219-
internalEventValidatorWiring = new ComponentWiring<>(
220-
model, InternalEventValidator.class, schedulers.internalEventValidatorScheduler());
219+
internalEventValidatorWiring =
220+
new ComponentWiring<>(model, InternalEventValidator.class, config.internalEventValidator());
221221
eventDeduplicatorWiring =
222222
new ComponentWiring<>(model, EventDeduplicator.class, schedulers.eventDeduplicatorScheduler());
223223
eventSignatureValidatorWiring = new ComponentWiring<>(

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/generate-platform-diagram.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pcli diagram \
2323
-s 'eventCreationManager:non-validated events:🍎' \
2424
-s 'Mystery Input:mystery data:X' \
2525
-s 'stateSigner:signature transactions:🖋️' \
26-
-g 'Event Validation:internalEventValidator,eventDeduplicator,eventSignatureValidator' \
26+
-g 'Event Validation:InternalEventValidator,eventDeduplicator,eventSignatureValidator' \
2727
-g 'Event Hashing:eventHasher,postHashCollector' \
2828
-g 'Orphan Buffer:orphanBuffer,orphanBufferSplitter' \
2929
-g 'Consensus Engine:consensusEngine,consensusEngineSplitter,eventWindowManager,getKeystoneEventSequenceNumber' \

0 commit comments

Comments
 (0)