Skip to content

Commit 9259ce8

Browse files
authored
Add context propagation for rector schedulers (#10311)
1 parent 0949ae2 commit 9259ce8

File tree

5 files changed

+87
-2
lines changed

5 files changed

+87
-2
lines changed

instrumentation/reactor/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/v3_1/ContextPropagationOperator.java

+47
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import java.lang.invoke.MethodHandles;
3232
import java.util.function.BiFunction;
3333
import java.util.function.Function;
34+
import java.util.logging.Level;
35+
import java.util.logging.Logger;
3436
import javax.annotation.Nullable;
3537
import org.reactivestreams.Publisher;
3638
import reactor.core.CoreSubscriber;
@@ -40,9 +42,11 @@
4042
import reactor.core.publisher.Hooks;
4143
import reactor.core.publisher.Mono;
4244
import reactor.core.publisher.Operators;
45+
import reactor.core.scheduler.Schedulers;
4346

4447
/** Based on Spring Sleuth's Reactor instrumentation. */
4548
public final class ContextPropagationOperator {
49+
private static final Logger logger = Logger.getLogger(ContextPropagationOperator.class.getName());
4650

4751
private static final Object VALUE = new Object();
4852

@@ -52,6 +56,8 @@ public final class ContextPropagationOperator {
5256
@Nullable
5357
private static final MethodHandle FLUX_CONTEXT_WRITE_METHOD = getContextWriteMethod(Flux.class);
5458

59+
@Nullable private static final MethodHandle SCHEDULERS_HOOK_METHOD = getSchedulersHookMethod();
60+
5561
@Nullable
5662
private static MethodHandle getContextWriteMethod(Class<?> type) {
5763
MethodHandles.Lookup lookup = MethodHandles.publicLookup();
@@ -68,6 +74,18 @@ private static MethodHandle getContextWriteMethod(Class<?> type) {
6874
return null;
6975
}
7076

77+
@Nullable
78+
private static MethodHandle getSchedulersHookMethod() {
79+
MethodHandles.Lookup lookup = MethodHandles.publicLookup();
80+
try {
81+
return lookup.findStatic(
82+
Schedulers.class, "onScheduleHook", methodType(void.class, String.class, Function.class));
83+
} catch (NoSuchMethodException | IllegalAccessException e) {
84+
// ignore
85+
}
86+
return null;
87+
}
88+
7189
public static ContextPropagationOperator create() {
7290
return builder().build();
7391
}
@@ -137,10 +155,22 @@ public void registerOnEachOperator() {
137155
Hooks.onEachOperator(
138156
TracingSubscriber.class.getName(), tracingLift(asyncOperationEndStrategy));
139157
AsyncOperationEndStrategies.instance().registerStrategy(asyncOperationEndStrategy);
158+
registerScheduleHook(RunnableWrapper.class.getName(), RunnableWrapper::new);
140159
enabled = true;
141160
}
142161
}
143162

163+
private static void registerScheduleHook(String key, Function<Runnable, Runnable> function) {
164+
if (SCHEDULERS_HOOK_METHOD == null) {
165+
return;
166+
}
167+
try {
168+
SCHEDULERS_HOOK_METHOD.invoke(key, function);
169+
} catch (Throwable throwable) {
170+
logger.log(Level.WARNING, "Failed to install scheduler hook", throwable);
171+
}
172+
}
173+
144174
/** Unregisters the hook registered by {@link #registerOnEachOperator()}. */
145175
public void resetOnEachOperator() {
146176
synchronized (lock) {
@@ -312,4 +342,21 @@ public Object scanUnsafe(Scannable.Attr attr) {
312342
return null;
313343
}
314344
}
345+
346+
private static class RunnableWrapper implements Runnable {
347+
private final Runnable delegate;
348+
private final Context context;
349+
350+
RunnableWrapper(Runnable delegate) {
351+
this.delegate = delegate;
352+
context = Context.current();
353+
}
354+
355+
@Override
356+
public void run() {
357+
try (Scope ignore = context.makeCurrent()) {
358+
delegate.run();
359+
}
360+
}
361+
}
315362
}

instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/webflux/v5_0/server/SpringWebfluxTest.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,15 @@ private static Stream<Arguments> provideParameters() {
206206
"/foo-delayed",
207207
"/foo-delayed",
208208
"getFooDelayed",
209-
new FooModel(3L, "delayed").toString()))));
209+
new FooModel(3L, "delayed").toString()))),
210+
Arguments.of(
211+
named(
212+
"annotation API without parameters no mono",
213+
new Parameter(
214+
"/foo-no-mono",
215+
"/foo-no-mono",
216+
"getFooModelNoMono",
217+
new FooModel(0L, "DEFAULT").toString()))));
210218
}
211219

212220
@ParameterizedTest(name = "{index}: {0}")

instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/test/java/server/TestController.java

+5
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ Mono<FooModel> getFooDelayedMono(@PathVariable("id") long id) {
6363
return Mono.just(id).delayElement(Duration.ofMillis(100)).map(TestController::tracedMethod);
6464
}
6565

66+
@GetMapping("/foo-no-mono")
67+
FooModel getFooModelNoMono() {
68+
return new FooModel(0L, "DEFAULT");
69+
}
70+
6671
private static FooModel tracedMethod(long id) {
6772
tracer.spanBuilder("tracedMethod").startSpan().end();
6873
return new FooModel(id, "tracedMethod");

instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/test/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/SpringWebfluxServerInstrumentationTest.java

+19
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,17 @@
55

66
package io.opentelemetry.instrumentation.spring.webflux.v5_3;
77

8+
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.SUCCESS;
9+
import static org.assertj.core.api.Assertions.assertThat;
10+
811
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
912
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpServerTest;
1013
import io.opentelemetry.instrumentation.testing.junit.http.HttpServerInstrumentationExtension;
1114
import io.opentelemetry.instrumentation.testing.junit.http.HttpServerTestOptions;
1215
import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint;
16+
import io.opentelemetry.testing.internal.armeria.common.AggregatedHttpRequest;
17+
import io.opentelemetry.testing.internal.armeria.common.AggregatedHttpResponse;
18+
import org.junit.jupiter.api.Test;
1319
import org.junit.jupiter.api.extension.RegisterExtension;
1420
import org.springframework.context.ConfigurableApplicationContext;
1521

@@ -47,4 +53,17 @@ protected void configure(HttpServerTestOptions options) {
4753

4854
options.disableTestNonStandardHttpMethod();
4955
}
56+
57+
@Test
58+
void noMono() {
59+
ServerEndpoint endpoint = new ServerEndpoint("NO_MONO", "no-mono", 200, "success");
60+
String method = "GET";
61+
AggregatedHttpRequest request = request(endpoint, method);
62+
AggregatedHttpResponse response = client.execute(request).aggregate().join();
63+
64+
assertThat(response.status().code()).isEqualTo(SUCCESS.getStatus());
65+
assertThat(response.contentUtf8()).isEqualTo(SUCCESS.getBody());
66+
67+
assertTheTraces(1, null, null, null, method, endpoint);
68+
}
5069
}

instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/test/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/TestWebfluxSpringBootApp.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ WebFilter telemetryFilter() {
5757
.setCapturedServerResponseHeaders(
5858
singletonList(AbstractHttpServerTest.TEST_RESPONSE_HEADER))
5959
.build()
60-
.createWebFilter();
60+
.createWebFilterAndRegisterReactorHook();
6161
}
6262

6363
@Controller
@@ -69,6 +69,12 @@ Flux<String> success() {
6969
return Flux.defer(() -> Flux.just(controller(SUCCESS, SUCCESS::getBody)));
7070
}
7171

72+
@RequestMapping("/no-mono")
73+
@ResponseBody
74+
String noMono() {
75+
return controller(SUCCESS, SUCCESS::getBody);
76+
}
77+
7278
@RequestMapping("/query")
7379
@ResponseBody
7480
Mono<String> query_param(@RequestParam("some") String param) {

0 commit comments

Comments
 (0)