Skip to content

Commit d4c425b

Browse files
authored
LoomServer parallel listener start (#7200)
* Addresses issues #7051, #6434, #6498
1 parent 5b99f69 commit d4c425b

File tree

6 files changed

+188
-50
lines changed

6 files changed

+188
-50
lines changed

inject/api/src/main/java/io/helidon/inject/api/InjectionServices.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ default Services services() {
184184
* <p>
185185
* If the service provider does not support shutdown an empty is returned.
186186
* <p>
187-
* The default reference implementation for Injection will return a map of all service types that were deactivated to any
187+
* The default reference implementation will return a map of all service types that were deactivated to any
188188
* throwable that was observed during that services shutdown sequence.
189189
* <p>
190190
* The order in which services are deactivated is dependent upon whether the {@link #activationLog()} is available.
@@ -194,6 +194,8 @@ default Services services() {
194194
* the same {@link RunLevel} value then the ordering will be based upon the implementation's comparator.
195195
* <p>
196196
* When shutdown returns, it is guaranteed that all services were shutdown, or failed to achieve shutdown.
197+
* <p>
198+
* The shutdown timeout from {@link InjectionServicesConfigBlueprint#shutdownTimeout()} will be applied as the default.
197199
*
198200
* @return a map of all managed service types deactivated to results of deactivation, or empty if shutdown is not supported
199201
*/

inject/runtime/src/main/java/io/helidon/inject/runtime/DefaultInjectionServices.java

Lines changed: 62 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import java.util.concurrent.TimeUnit;
3535
import java.util.concurrent.atomic.AtomicBoolean;
3636
import java.util.concurrent.atomic.AtomicReference;
37+
import java.util.concurrent.locks.Lock;
38+
import java.util.concurrent.locks.ReentrantReadWriteLock;
3739
import java.util.stream.Collectors;
3840

3941
import io.helidon.common.config.Config;
@@ -69,6 +71,7 @@
6971
class DefaultInjectionServices implements InjectionServices, Resettable {
7072
static final System.Logger LOGGER = System.getLogger(DefaultInjectionServices.class.getName());
7173

74+
private final ReentrantReadWriteLock lifecycleLock = new ReentrantReadWriteLock();
7275
private final AtomicBoolean initializingServicesStarted = new AtomicBoolean(false);
7376
private final AtomicBoolean initializingServicesFinished = new AtomicBoolean(false);
7477
private final AtomicBoolean isBinding = new AtomicBoolean(false);
@@ -149,32 +152,33 @@ public Optional<Set<ServiceInfoCriteria>> lookups() {
149152

150153
@Override
151154
public Optional<DefaultServices> services(boolean initialize) {
152-
if (!initialize) {
153-
return Optional.ofNullable(services.get());
154-
}
155+
boolean isWriteLock = initialize;
156+
Lock lock = (isWriteLock) ? lifecycleLock.writeLock() : lifecycleLock.readLock();
157+
lock.lock();
158+
try {
159+
if (!initialize) {
160+
return Optional.ofNullable(services.get());
161+
}
155162

156-
if (!initializingServicesStarted.getAndSet(true)) {
157-
try {
158-
initializeServices();
159-
} catch (Throwable t) {
160-
state.lastError(t);
161-
initializingServicesStarted.set(false);
162-
if (t instanceof InjectionException) {
163-
throw (InjectionException) t;
164-
} else {
165-
throw new InjectionException("Failed to initialize: " + t.getMessage(), t);
163+
init();
164+
165+
DefaultServices thisServices = services.get();
166+
if (thisServices == null) {
167+
lock.unlock();
168+
lock = lifecycleLock.writeLock();
169+
lock.lock();
170+
reset(true);
171+
init();
172+
thisServices = services.get();
173+
174+
if (thisServices == null) {
175+
throw new InjectionException("Unable to reset() after shutdown()");
166176
}
167-
} finally {
168-
state.finished(true);
169-
initializingServicesFinished.set(true);
170177
}
178+
return Optional.of(thisServices);
179+
} finally {
180+
lock.unlock();
171181
}
172-
173-
DefaultServices thisServices = services.get();
174-
if (thisServices == null) {
175-
throw new InjectionException("Must reset() after shutdown()");
176-
}
177-
return Optional.of(thisServices);
178182
}
179183

180184
@Override
@@ -183,16 +187,18 @@ public Optional<Map<TypeName, ActivationResult>> shutdown() {
183187
DefaultServices current = services.get();
184188
if (services.compareAndSet(current, null) && current != null) {
185189
State currentState = state.clone().currentPhase(Phase.PRE_DESTROYING);
186-
log("started shutdown");
190+
log("Started shutdown");
187191
result = doShutdown(current, currentState);
188-
log("finished shutdown");
192+
log("Finished shutdown");
189193
}
190194
return Optional.ofNullable(result);
191195
}
192196

193197
@Override
194198
// note that this is typically only called during testing, and also in the injection maven-plugin
195199
public boolean reset(boolean deep) {
200+
Lock lock = lifecycleLock.writeLock();
201+
lock.lock();
196202
try {
197203
assertNotInitializing();
198204
if (isInitializing() || isInitialized()) {
@@ -228,6 +234,8 @@ public boolean reset(boolean deep) {
228234
throw new InjectionException("Failed to reset (state=" + state
229235
+ ", isInitialized=" + isInitialized()
230236
+ ", isInitializing=" + isInitializing() + ")", e);
237+
} finally {
238+
lock.unlock();
231239
}
232240
}
233241

@@ -291,6 +299,25 @@ private void assertNotInitializing() {
291299
}
292300
}
293301

302+
private void init() {
303+
if (!initializingServicesStarted.getAndSet(true)) {
304+
try {
305+
initializeServices();
306+
} catch (Throwable t) {
307+
state.lastError(t);
308+
initializingServicesStarted.set(false);
309+
if (t instanceof InjectionException) {
310+
throw (InjectionException) t;
311+
} else {
312+
throw new InjectionException("Failed to initialize: " + t.getMessage(), t);
313+
}
314+
} finally {
315+
state.finished(true);
316+
initializingServicesFinished.set(true);
317+
}
318+
}
319+
}
320+
294321
private void initializeServices() {
295322
initializationCallingContext = CallingContextFactory.create(false).orElse(null);
296323

@@ -460,6 +487,16 @@ private class Shutdown implements Callable<Map<TypeName, ActivationResult>> {
460487

461488
@Override
462489
public Map<TypeName, ActivationResult> call() {
490+
Lock lock = lifecycleLock.writeLock();
491+
lock.lock();
492+
try {
493+
return doShutdown();
494+
} finally {
495+
lock.unlock();
496+
}
497+
}
498+
499+
private Map<TypeName, ActivationResult> doShutdown() {
463500
state.currentPhase(Phase.DESTROYED);
464501

465502
ActivationLogQuery query = log.toQuery().orElse(null);
@@ -497,7 +534,7 @@ public Map<TypeName, ActivationResult> call() {
497534
doFinalShutdown(serviceProviders);
498535

499536
// finally, clear everything
500-
reset(false);
537+
reset(true);
501538

502539
return map;
503540
}

inject/runtime/src/main/java/io/helidon/inject/runtime/DefaultServices.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -437,15 +437,15 @@ void bind(InjectionServices injectionServices,
437437
DefaultInjectionPlanBinder binder,
438438
Application app) {
439439
String appName = app.named().orElse(app.getClass().getName());
440-
boolean isLoggable = DefaultInjectionServices.LOGGER.isLoggable(System.Logger.Level.INFO);
440+
boolean isLoggable = DefaultInjectionServices.LOGGER.isLoggable(System.Logger.Level.DEBUG);
441441
if (isLoggable) {
442-
DefaultInjectionServices.LOGGER.log(System.Logger.Level.INFO, "starting binding application: " + appName);
442+
DefaultInjectionServices.LOGGER.log(System.Logger.Level.DEBUG, "Starting binding application: " + appName);
443443
}
444444
try {
445445
app.configure(binder);
446446
bind(createServiceProvider(app, injectionServices));
447447
if (isLoggable) {
448-
DefaultInjectionServices.LOGGER.log(System.Logger.Level.INFO, "finished binding application: " + appName);
448+
DefaultInjectionServices.LOGGER.log(System.Logger.Level.DEBUG, "Finished binding application: " + appName);
449449
}
450450
} catch (Exception e) {
451451
throw new InjectionException("Failed to process: " + app, e);

inject/tests/resources-inject/src/test/java/io/helidon/inject/tests/inject/tbox/ToolBoxTest.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import io.helidon.common.types.TypeName;
2626
import io.helidon.config.Config;
2727
import io.helidon.inject.api.ActivationResult;
28-
import io.helidon.inject.api.InjectionException;
2928
import io.helidon.inject.api.InjectionServices;
3029
import io.helidon.inject.api.ModuleComponent;
3130
import io.helidon.inject.api.Qualifier;
@@ -34,14 +33,14 @@
3433
import io.helidon.inject.api.ServiceProvider;
3534
import io.helidon.inject.api.Services;
3635
import io.helidon.inject.testing.InjectionTestingSupport;
37-
import io.helidon.inject.tests.inject.stacking.CommonContract;
38-
import io.helidon.inject.tests.inject.tbox.impl.BigHammer;
39-
import io.helidon.inject.tests.inject.tbox.impl.HandSaw;
4036
import io.helidon.inject.tests.inject.ASerialProviderImpl;
4137
import io.helidon.inject.tests.inject.ClassNamedY;
4238
import io.helidon.inject.tests.inject.TestingSingleton;
4339
import io.helidon.inject.tests.inject.provider.FakeConfig;
4440
import io.helidon.inject.tests.inject.provider.FakeServer;
41+
import io.helidon.inject.tests.inject.stacking.CommonContract;
42+
import io.helidon.inject.tests.inject.tbox.impl.BigHammer;
43+
import io.helidon.inject.tests.inject.tbox.impl.HandSaw;
4544
import io.helidon.inject.tests.inject.tbox.impl.MainToolBox;
4645
import io.helidon.inject.tools.Options;
4746

@@ -50,7 +49,7 @@
5049
import org.junit.jupiter.api.BeforeEach;
5150
import org.junit.jupiter.api.Test;
5251

53-
import static io.helidon.common.types.TypeName.*;
52+
import static io.helidon.common.types.TypeName.create;
5453
import static io.helidon.inject.testing.InjectionTestingSupport.resetAll;
5554
import static io.helidon.inject.testing.InjectionTestingSupport.testableServices;
5655
import static io.helidon.inject.tests.inject.TestUtils.loadStringFromFile;
@@ -64,7 +63,6 @@
6463
import static org.hamcrest.Matchers.greaterThan;
6564
import static org.hamcrest.Matchers.hasEntry;
6665
import static org.junit.jupiter.api.Assertions.assertNotNull;
67-
import static org.junit.jupiter.api.Assertions.assertThrows;
6866

6967
/**
7068
* Expectation here is that the annotation processor ran, and we can use standard injection and di registry services, etc.
@@ -314,9 +312,6 @@ void startupAndShutdownCallsPostConstructAndPreDestroy() {
314312

315313
assertThat(injectionServices.metrics().orElseThrow().lookupCount().orElse(0), equalTo(0));
316314

317-
InjectionException e = assertThrows(InjectionException.class, () -> injectionServices.services());
318-
assertThat(e.getMessage(), equalTo("Must reset() after shutdown()"));
319-
320315
tearDown();
321316
setUp();
322317
TestingSingleton testingSingletonFromLookup2 = injectionServices.services().lookup(TestingSingleton.class).get();
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright (c) 2023 Oracle and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.helidon.inject.runtime;
18+
19+
import java.util.LinkedHashMap;
20+
import java.util.Map;
21+
import java.util.Objects;
22+
import java.util.concurrent.ConcurrentHashMap;
23+
import java.util.concurrent.ExecutorService;
24+
import java.util.concurrent.Executors;
25+
import java.util.concurrent.Future;
26+
27+
import io.helidon.common.types.TypeName;
28+
import io.helidon.config.Config;
29+
import io.helidon.config.ConfigSources;
30+
import io.helidon.inject.api.ActivationResult;
31+
import io.helidon.inject.api.Bootstrap;
32+
import io.helidon.inject.api.InjectionServices;
33+
import io.helidon.inject.api.Services;
34+
35+
import org.junit.jupiter.api.AfterEach;
36+
import org.junit.jupiter.api.BeforeEach;
37+
import org.junit.jupiter.api.Test;
38+
39+
import static org.hamcrest.CoreMatchers.notNullValue;
40+
import static org.hamcrest.MatcherAssert.assertThat;
41+
import static org.junit.jupiter.api.Assertions.fail;
42+
43+
class LockContentionTest {
44+
final int COUNT = 100;
45+
46+
final ExecutorService es = Executors.newFixedThreadPool(16);
47+
final Config config = Config.builder(
48+
ConfigSources.create(
49+
Map.of("inject.permits-dynamic", "true"), "config-1"))
50+
.disableEnvironmentVariablesSource()
51+
.disableSystemPropertiesSource()
52+
.build();
53+
54+
@BeforeEach
55+
void init() {
56+
InjectionServices.globalBootstrap(Bootstrap.builder().config(config).build());
57+
}
58+
59+
@AfterEach
60+
void tearDown() {
61+
SimpleInjectionTestingSupport.resetAll();
62+
}
63+
64+
@Test
65+
// we cannot interlace shutdown with startups here - so instead we are checking to ensure N threads can call startup
66+
// and then N threads can call shutdown in parallel, but in phases
67+
void lockContention() {
68+
Map<String, Future<?>> result = new ConcurrentHashMap<>();
69+
for (int i = 1; i <= COUNT; i++) {
70+
result.put("start run:" + i, es.submit(this::start));
71+
}
72+
73+
verify(result);
74+
result.clear();
75+
76+
for (int i = 1; i <= COUNT; i++) {
77+
result.put("shutdown run:" + i, es.submit(this::shutdown));
78+
}
79+
80+
verify(result);
81+
}
82+
83+
void verify(Map<String, Future<?>> result) {
84+
result.forEach((k, v) -> {
85+
try {
86+
assertThat(k, v.get(), notNullValue());
87+
} catch (Exception e) {
88+
fail("failed on " + k, e);
89+
}
90+
});
91+
}
92+
93+
Services start() {
94+
return Objects.requireNonNull(InjectionServices.realizedServices());
95+
}
96+
97+
Map<TypeName, ActivationResult> shutdown() {
98+
InjectionServices injectionServices = InjectionServices.injectionServices().orElseThrow();
99+
Map<TypeName, ActivationResult> result = new LinkedHashMap<>();
100+
Map<TypeName, ActivationResult> round;
101+
do {
102+
round = injectionServices.shutdown().orElseThrow();
103+
result.putAll(round);
104+
} while (!round.isEmpty());
105+
return result;
106+
}
107+
108+
}

nima/webserver/webserver/src/main/java/io/helidon/nima/webserver/LoomServer.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -276,19 +276,15 @@ private void stopIt() {
276276

277277
private void startIt() {
278278
long now = System.currentTimeMillis();
279-
// TODO parallel start breaks pico - issue
280-
// boolean result = parallel("start", ServerListener::start);
281-
for (ServerListener listener : listeners.values()) {
282-
listener.start();
279+
boolean result = parallel("start", ServerListener::start);
280+
if (!result) {
281+
LOGGER.log(System.Logger.Level.ERROR, "Níma server failed to start, shutting down");
282+
parallel("stop", ServerListener::stop);
283+
if (startFutures != null) {
284+
startFutures.forEach(future -> future.future().cancel(true));
285+
}
286+
return;
283287
}
284-
// if (!result) {
285-
// LOGGER.log(System.Logger.Level.ERROR, "Níma server failed to start, shutting down");
286-
// parallel("stop", ServerListener::stop);
287-
// if (startFutures != null) {
288-
// startFutures.forEach(future -> future.future().cancel(true));
289-
// }
290-
// return;
291-
// }
292288
if (registerShutdownHook) {
293289
registerShutdownHook();
294290
}

0 commit comments

Comments
 (0)