Skip to content

Commit 4325eb2

Browse files
authored
Instrument ConnectionSource in Akka/Pekko HTTP Servers (#11103)
1 parent 95ca5ad commit 4325eb2

File tree

7 files changed

+354
-3
lines changed

7 files changed

+354
-3
lines changed

instrumentation/akka/akka-actor-fork-join-2.5/javaagent/build.gradle.kts

+14-1
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,23 @@ plugins {
44
}
55

66
muzzle {
7+
// Akka's fork-join was removed in 2.6, replaced with the normal java.concurrent version
78
pass {
89
group.set("com.typesafe.akka")
910
module.set("akka-actor_2.11")
10-
versions.set("[2.5,)")
11+
versions.set("[2.5,)") // Scala 2.11 support was dropped after 2.5, so no 2.6 versions exist with this name
12+
assertInverse.set(true)
13+
}
14+
pass {
15+
group.set("com.typesafe.akka")
16+
module.set("akka-actor_2.12")
17+
versions.set("[2.5,2.6)")
18+
assertInverse.set(true)
19+
}
20+
pass {
21+
group.set("com.typesafe.akka")
22+
module.set("akka-actor_2.13")
23+
versions.set("[2.5.23,2.6)") // Scala 2.13 support was added in the middle of the 2.5 release
1124
assertInverse.set(true)
1225
}
1326
}

instrumentation/akka/akka-http-10.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkahttp/server/AkkaHttpServerInstrumentationModule.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ public boolean isIndyModule() {
3636

3737
@Override
3838
public List<TypeInstrumentation> typeInstrumentations() {
39-
return asList(new HttpExtServerInstrumentation(), new GraphInterpreterInstrumentation());
39+
return asList(
40+
new HttpExtServerInstrumentation(),
41+
new GraphInterpreterInstrumentation(),
42+
new AkkaHttpServerSourceInstrumentation());
4043
}
4144
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.akkahttp.server;
7+
8+
import static net.bytebuddy.matcher.ElementMatchers.named;
9+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
10+
11+
import akka.http.scaladsl.model.HttpRequest;
12+
import akka.http.scaladsl.model.HttpResponse;
13+
import akka.stream.scaladsl.Flow;
14+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
15+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
16+
import net.bytebuddy.asm.Advice;
17+
import net.bytebuddy.description.type.TypeDescription;
18+
import net.bytebuddy.matcher.ElementMatcher;
19+
20+
public class AkkaHttpServerSourceInstrumentation implements TypeInstrumentation {
21+
@Override
22+
public ElementMatcher<TypeDescription> typeMatcher() {
23+
return named("akka.http.scaladsl.Http$IncomingConnection");
24+
}
25+
26+
@Override
27+
public void transform(TypeTransformer transformer) {
28+
transformer.applyAdviceToMethod(
29+
named("handleWith").and(takesArgument(0, named("akka.stream.scaladsl.Flow"))),
30+
this.getClass().getName() + "$AkkaBindAndHandleAdvice");
31+
}
32+
33+
@SuppressWarnings("unused")
34+
public static class AkkaBindAndHandleAdvice {
35+
36+
@Advice.OnMethodEnter(suppress = Throwable.class)
37+
public static void wrapHandler(
38+
@Advice.Argument(value = 0, readOnly = false) Flow<HttpRequest, HttpResponse, ?> handler) {
39+
handler = AkkaFlowWrapper.wrap(handler);
40+
}
41+
}
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.akkahttp
7+
8+
import akka.actor.ActorSystem
9+
import akka.http.scaladsl.Http
10+
import akka.http.scaladsl.Http.ServerBinding
11+
import akka.http.scaladsl.model.StatusCodes.Found
12+
import akka.http.scaladsl.server.Directives._
13+
import akka.stream.ActorMaterializer
14+
import akka.stream.scaladsl.Sink
15+
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpServerTest
16+
import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint._
17+
18+
import java.util.function.Supplier
19+
import scala.concurrent.Await
20+
21+
object AkkaHttpTestServerSourceWebServer {
22+
implicit val system = ActorSystem("my-system")
23+
implicit val materializer = ActorMaterializer()
24+
// needed for the future flatMap/onComplete in the end
25+
implicit val executionContext = system.dispatcher
26+
27+
var route = get {
28+
concat(
29+
path(SUCCESS.rawPath()) {
30+
complete(
31+
AbstractHttpServerTest.controller(SUCCESS, supplier(SUCCESS.getBody))
32+
)
33+
},
34+
path(INDEXED_CHILD.rawPath()) {
35+
parameterMap { map =>
36+
val supplier = new Supplier[String] {
37+
def get(): String = {
38+
INDEXED_CHILD.collectSpanAttributes(new UrlParameterProvider {
39+
override def getParameter(name: String): String =
40+
map.get(name).orNull
41+
})
42+
""
43+
}
44+
}
45+
complete(AbstractHttpServerTest.controller(INDEXED_CHILD, supplier))
46+
}
47+
},
48+
path(QUERY_PARAM.rawPath()) {
49+
extractUri { uri =>
50+
complete(
51+
AbstractHttpServerTest
52+
.controller(INDEXED_CHILD, supplier(uri.queryString().orNull))
53+
)
54+
}
55+
},
56+
path(REDIRECT.rawPath()) {
57+
redirect(
58+
AbstractHttpServerTest
59+
.controller(REDIRECT, supplier(REDIRECT.getBody)),
60+
Found
61+
)
62+
},
63+
path(ERROR.rawPath()) {
64+
complete(
65+
500 -> AbstractHttpServerTest
66+
.controller(ERROR, supplier(ERROR.getBody))
67+
)
68+
},
69+
path("path" / LongNumber / "param") { id =>
70+
complete(
71+
AbstractHttpServerTest.controller(PATH_PARAM, supplier(id.toString))
72+
)
73+
},
74+
path(
75+
"test1" / IntNumber / HexIntNumber / LongNumber / HexLongNumber /
76+
DoubleNumber / JavaUUID / Remaining
77+
) { (_, _, _, _, _, _, _) =>
78+
complete(SUCCESS.getBody)
79+
},
80+
pathPrefix("test2") {
81+
concat(
82+
path("first") {
83+
complete(SUCCESS.getBody)
84+
},
85+
path("second") {
86+
complete(SUCCESS.getBody)
87+
}
88+
)
89+
}
90+
)
91+
}
92+
93+
private var binding: ServerBinding = null
94+
95+
def start(port: Int): Unit = synchronized {
96+
if (null == binding) {
97+
import scala.concurrent.duration._
98+
binding = Await.result(
99+
Http()
100+
.bind("localhost", port)
101+
.map(_.handleWith(route))
102+
.to(Sink.ignore)
103+
.run(),
104+
10.seconds
105+
)
106+
}
107+
}
108+
109+
def stop(): Unit = synchronized {
110+
if (null != binding) {
111+
binding.unbind()
112+
system.terminate()
113+
binding = null
114+
}
115+
}
116+
117+
def supplier(string: String): Supplier[String] = {
118+
new Supplier[String] {
119+
def get(): String = {
120+
string
121+
}
122+
}
123+
}
124+
}

instrumentation/pekko/pekko-http-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/server/PekkoHttpServerInstrumentationModule.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ public boolean isIndyModule() {
3636

3737
@Override
3838
public List<TypeInstrumentation> typeInstrumentations() {
39-
return asList(new HttpExtServerInstrumentation(), new GraphInterpreterInstrumentation());
39+
return asList(
40+
new HttpExtServerInstrumentation(),
41+
new GraphInterpreterInstrumentation(),
42+
new PekkoHttpServerSourceInstrumentation());
4043
}
4144
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server;
7+
8+
import static net.bytebuddy.matcher.ElementMatchers.named;
9+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
10+
11+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
12+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
13+
import net.bytebuddy.asm.Advice;
14+
import net.bytebuddy.description.type.TypeDescription;
15+
import net.bytebuddy.matcher.ElementMatcher;
16+
import org.apache.pekko.http.scaladsl.model.HttpRequest;
17+
import org.apache.pekko.http.scaladsl.model.HttpResponse;
18+
import org.apache.pekko.stream.scaladsl.Flow;
19+
20+
public class PekkoHttpServerSourceInstrumentation implements TypeInstrumentation {
21+
@Override
22+
public ElementMatcher<TypeDescription> typeMatcher() {
23+
return named("org.apache.pekko.http.scaladsl.Http$IncomingConnection");
24+
}
25+
26+
@Override
27+
public void transform(TypeTransformer transformer) {
28+
transformer.applyAdviceToMethod(
29+
named("handleWith").and(takesArgument(0, named("org.apache.pekko.stream.scaladsl.Flow"))),
30+
this.getClass().getName() + "$PekkoBindAndHandleAdvice");
31+
}
32+
33+
@SuppressWarnings("unused")
34+
public static class PekkoBindAndHandleAdvice {
35+
36+
@Advice.OnMethodEnter(suppress = Throwable.class)
37+
public static void wrapHandler(
38+
@Advice.Argument(value = 0, readOnly = false) Flow<HttpRequest, HttpResponse, ?> handler) {
39+
handler = PekkoFlowWrapper.wrap(handler);
40+
}
41+
}
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0
7+
8+
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpServerTest
9+
import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint._
10+
import org.apache.pekko.actor.ActorSystem
11+
import org.apache.pekko.http.scaladsl.Http
12+
import org.apache.pekko.http.scaladsl.Http.ServerBinding
13+
import org.apache.pekko.http.scaladsl.model.StatusCodes.Found
14+
import org.apache.pekko.http.scaladsl.server.Directives._
15+
import org.apache.pekko.stream.ActorMaterializer
16+
import org.apache.pekko.stream.scaladsl.Sink
17+
18+
import java.util.function.Supplier
19+
import scala.concurrent.Await
20+
21+
object PekkoHttpTestServerSourceWebServer {
22+
implicit val system = ActorSystem("my-system")
23+
implicit val materializer = ActorMaterializer()
24+
// needed for the future flatMap/onComplete in the end
25+
implicit val executionContext = system.dispatcher
26+
27+
var route = get {
28+
concat(
29+
path(SUCCESS.rawPath()) {
30+
complete(
31+
AbstractHttpServerTest.controller(SUCCESS, supplier(SUCCESS.getBody))
32+
)
33+
},
34+
path(INDEXED_CHILD.rawPath()) {
35+
parameterMap { map =>
36+
val supplier = new Supplier[String] {
37+
def get(): String = {
38+
INDEXED_CHILD.collectSpanAttributes(new UrlParameterProvider {
39+
override def getParameter(name: String): String =
40+
map.get(name).orNull
41+
})
42+
""
43+
}
44+
}
45+
complete(AbstractHttpServerTest.controller(INDEXED_CHILD, supplier))
46+
}
47+
},
48+
path(QUERY_PARAM.rawPath()) {
49+
extractUri { uri =>
50+
complete(
51+
AbstractHttpServerTest
52+
.controller(INDEXED_CHILD, supplier(uri.queryString().orNull))
53+
)
54+
}
55+
},
56+
path(REDIRECT.rawPath()) {
57+
redirect(
58+
AbstractHttpServerTest
59+
.controller(REDIRECT, supplier(REDIRECT.getBody)),
60+
Found
61+
)
62+
},
63+
path(ERROR.rawPath()) {
64+
complete(
65+
500 -> AbstractHttpServerTest
66+
.controller(ERROR, supplier(ERROR.getBody))
67+
)
68+
},
69+
path("path" / LongNumber / "param") { id =>
70+
complete(
71+
AbstractHttpServerTest.controller(PATH_PARAM, supplier(id.toString))
72+
)
73+
},
74+
path(
75+
"test1" / IntNumber / HexIntNumber / LongNumber / HexLongNumber /
76+
DoubleNumber / JavaUUID / Remaining
77+
) { (_, _, _, _, _, _, _) =>
78+
complete(SUCCESS.getBody)
79+
},
80+
pathPrefix("test2") {
81+
concat(
82+
path("first") {
83+
complete(SUCCESS.getBody)
84+
},
85+
path("second") {
86+
complete(SUCCESS.getBody)
87+
}
88+
)
89+
}
90+
)
91+
}
92+
93+
private var binding: ServerBinding = null
94+
95+
def start(port: Int): Unit = synchronized {
96+
if (null == binding) {
97+
import scala.concurrent.duration._
98+
binding = Await.result(
99+
Http()
100+
.bind("localhost", port)
101+
.map(_.handleWith(route))
102+
.to(Sink.ignore)
103+
.run(),
104+
10.seconds
105+
)
106+
}
107+
}
108+
109+
def stop(): Unit = synchronized {
110+
if (null != binding) {
111+
binding.unbind()
112+
system.terminate()
113+
binding = null
114+
}
115+
}
116+
117+
def supplier(string: String): Supplier[String] = {
118+
new Supplier[String] {
119+
def get(): String = {
120+
string
121+
}
122+
}
123+
}
124+
}

0 commit comments

Comments
 (0)