Skip to content

Commit c9a9f57

Browse files
authored
Fix ending span in Ktor plugin (#11726)
1 parent b00ccd7 commit c9a9f57

File tree

4 files changed

+57
-11
lines changed

4 files changed

+57
-11
lines changed

instrumentation/ktor/ktor-2.0/library/src/main/kotlin/io/opentelemetry/instrumentation/ktor/v2_0/client/KtorClientTracing.kt

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ import io.opentelemetry.context.Context
1616
import io.opentelemetry.context.propagation.ContextPropagators
1717
import io.opentelemetry.extension.kotlin.asContextElement
1818
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter
19+
import kotlinx.coroutines.InternalCoroutinesApi
20+
import kotlinx.coroutines.job
21+
import kotlinx.coroutines.launch
1922
import kotlinx.coroutines.withContext
2023

2124
class KtorClientTracing internal constructor(
@@ -83,23 +86,21 @@ class KtorClientTracing internal constructor(
8386
}
8487
}
8588

89+
@OptIn(InternalCoroutinesApi::class)
8690
private fun installSpanEnd(plugin: KtorClientTracing, scope: HttpClient) {
8791
val endSpanPhase = PipelinePhase("OpenTelemetryEndSpan")
8892
scope.receivePipeline.insertPhaseBefore(HttpReceivePipeline.State, endSpanPhase)
8993

9094
scope.receivePipeline.intercept(endSpanPhase) {
9195
val openTelemetryContext = it.call.attributes.getOrNull(openTelemetryContextKey)
96+
openTelemetryContext ?: return@intercept
9297

93-
if (openTelemetryContext != null) {
94-
try {
95-
withContext(openTelemetryContext.asContextElement()) { proceed() }
96-
plugin.endSpan(openTelemetryContext, it.call, null)
97-
} catch (e: Throwable) {
98-
plugin.endSpan(openTelemetryContext, it.call, e)
99-
throw e
100-
}
101-
} else {
102-
proceed()
98+
scope.launch {
99+
val job = it.call.coroutineContext.job
100+
job.join()
101+
val cause = job.getCancellationException()
102+
103+
plugin.endSpan(openTelemetryContext, it.call, cause)
103104
}
104105
}
105106
}

instrumentation/ktor/ktor-2.0/testing/src/main/kotlin/io/opentelemetry/instrumentation/ktor/v2_0/client/AbstractKtorHttpClientTest.kt

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,20 @@ import io.ktor.client.engine.cio.*
1010
import io.ktor.client.plugins.*
1111
import io.ktor.client.request.*
1212
import io.ktor.http.*
13+
import io.opentelemetry.api.trace.SpanKind
1314
import io.opentelemetry.context.Context
1415
import io.opentelemetry.extension.kotlin.asContextElement
1516
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest
1617
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult
1718
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions
1819
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions.DEFAULT_HTTP_ATTRIBUTES
20+
import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat
21+
import io.opentelemetry.sdk.testing.assertj.TraceAssert
1922
import io.opentelemetry.semconv.NetworkAttributes
2023
import kotlinx.coroutines.*
24+
import org.junit.jupiter.api.Test
2125
import java.net.URI
26+
import java.util.function.Consumer
2227

2328
abstract class AbstractKtorHttpClientTest : AbstractHttpClientTest<HttpRequestBuilder>() {
2429

@@ -71,4 +76,24 @@ abstract class AbstractKtorHttpClientTest : AbstractHttpClientTest<HttpRequestBu
7176
}
7277
}
7378
}
79+
80+
@Test
81+
fun checkSpanEndsAfterBodyReceived() {
82+
val method = "GET"
83+
val path = "/long-request"
84+
val uri = resolveAddress(path)
85+
val responseCode = doRequest(method, uri)
86+
87+
assertThat(responseCode).isEqualTo(200)
88+
89+
testing.waitAndAssertTraces(
90+
Consumer { trace: TraceAssert ->
91+
val span = trace.getSpan(0)
92+
assertThat(span).hasKind(SpanKind.CLIENT)
93+
assertThat(span.endEpochNanos - span.startEpochNanos >= 1_000_000_000)
94+
.describedAs("Span duration should be at least 1000ms")
95+
.isTrue()
96+
}
97+
)
98+
}
7499
}

testing-common/src/main/java/io/opentelemetry/instrumentation/testing/junit/http/AbstractHttpClientTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1048,7 +1048,7 @@ static SpanDataAssert assertServerSpan(SpanDataAssert span) {
10481048
return span.hasName("test-http-server").hasKind(SpanKind.SERVER);
10491049
}
10501050

1051-
private int doRequest(String method, URI uri) throws Exception {
1051+
protected int doRequest(String method, URI uri) throws Exception {
10521052
return doRequest(method, uri, Collections.emptyMap());
10531053
}
10541054

testing-common/src/main/java/io/opentelemetry/instrumentation/testing/junit/http/HttpClientTestServer.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.opentelemetry.instrumentation.test.server.http.RequestContextGetter;
1818
import io.opentelemetry.testing.internal.armeria.common.HttpData;
1919
import io.opentelemetry.testing.internal.armeria.common.HttpResponse;
20+
import io.opentelemetry.testing.internal.armeria.common.HttpResponseWriter;
2021
import io.opentelemetry.testing.internal.armeria.common.HttpStatus;
2122
import io.opentelemetry.testing.internal.armeria.common.ResponseHeaders;
2223
import io.opentelemetry.testing.internal.armeria.common.ResponseHeadersBuilder;
@@ -29,6 +30,7 @@
2930
import java.nio.file.Paths;
3031
import java.security.KeyStore;
3132
import java.time.Duration;
33+
import java.util.concurrent.TimeUnit;
3234
import javax.net.ssl.KeyManagerFactory;
3335

3436
public final class HttpClientTestServer extends ServerExtension {
@@ -99,6 +101,24 @@ protected void configure(ServerBuilder sb) throws Exception {
99101
"/read-timeout",
100102
(ctx, req) ->
101103
HttpResponse.delayed(HttpResponse.of(HttpStatus.OK), Duration.ofSeconds(20)))
104+
.service(
105+
"/long-request",
106+
(ctx, req) -> {
107+
HttpResponseWriter writer = HttpResponse.streaming();
108+
writer.write(ResponseHeaders.of(HttpStatus.OK));
109+
writer.write(HttpData.ofUtf8("Hello"));
110+
111+
ctx.eventLoop()
112+
.schedule(
113+
() -> {
114+
writer.write(HttpData.ofUtf8("World"));
115+
writer.close();
116+
},
117+
1,
118+
TimeUnit.SECONDS);
119+
120+
return writer;
121+
})
102122
.decorator(
103123
(delegate, ctx, req) -> {
104124
for (String field : openTelemetry.getPropagators().getTextMapPropagator().fields()) {

0 commit comments

Comments
 (0)