Skip to content

Commit 43a9cd6

Browse files
bdoyle0182Brendan Doyle
authored andcommitted
upgrade kafka client library to 2.8.2 (apache#5400)
* upgrade kafka client library * attempt build upgrading embedded kafka * attempt to fix standalone server startup test * bump to kafka client to latest patch * revert kafka test timeout config change --------- Co-authored-by: Brendan Doyle <[email protected]> (cherry picked from commit 6bc559d)
1 parent 034b458 commit 43a9cd6

File tree

8 files changed

+47
-30
lines changed

8 files changed

+47
-30
lines changed

common/scala/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ dependencies {
6161
api "commons-codec:commons-codec:1.9"
6262
api "commons-io:commons-io:2.11.0"
6363
api "commons-collections:commons-collections:3.2.2"
64-
api "org.apache.kafka:kafka-clients:2.4.0"
64+
api "org.apache.kafka:kafka-clients:2.8.2"
6565
api "org.apache.httpcomponents:httpclient:4.5.5"
6666
api "com.fasterxml.uuid:java-uuid-generator:3.1.3"
6767
api "com.github.ben-manes.caffeine:caffeine:2.6.2"

core/monitoring/user-events/build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ dependencies {
4545

4646
testImplementation "junit:junit:4.11"
4747
testImplementation "org.scalatest:scalatest_${gradle.scala.depVersion}:3.0.8"
48-
testImplementation "io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.4.0"
48+
testImplementation "io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.8.1"
4949
constraints {
50-
testImplementation("io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.4.0")
50+
testImplementation("io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.8.1")
5151
testImplementation('org.apache.avro:avro:1.11.1') {
5252
because 'embeddedkafka dependency cannot be upgraded currently and avro in embedded kafka 2.4.0 has vulns'
5353
}

core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KafkaSpecBase.scala

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
package org.apache.openwhisk.core.monitoring.metrics
1919

20-
import akka.kafka.testkit.scaladsl.{EmbeddedKafkaLike, ScalatestKafkaSpec}
21-
import net.manub.embeddedkafka.EmbeddedKafka
20+
import akka.kafka.testkit.scaladsl.ScalatestKafkaSpec
21+
import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
2222
import org.scalatest._
2323
import org.scalatest.concurrent.{Eventually, IntegrationPatience, ScalaFutures}
2424

@@ -30,11 +30,25 @@ abstract class KafkaSpecBase
3030
with ScalaFutures
3131
with FlatSpecLike
3232
with EmbeddedKafka
33-
with EmbeddedKafkaLike
3433
with IntegrationPatience
3534
with Eventually
3635
with EventsTestHelper { this: Suite =>
3736
implicit val timeoutConfig: PatienceConfig = PatienceConfig(1.minute)
3837
override val sleepAfterProduce: FiniteDuration = 10.seconds
3938
override protected val topicCreationTimeout = 60.seconds
39+
override protected val producerPublishTimeout: FiniteDuration = 60.seconds
40+
41+
lazy implicit val embeddedKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort, zooKeeperPort)
42+
43+
override def bootstrapServers = s"localhost:${embeddedKafkaConfig.kafkaPort}"
44+
45+
override def setUp(): Unit = {
46+
EmbeddedKafka.start()(embeddedKafkaConfig)
47+
super.setUp()
48+
}
49+
50+
override def cleanUp(): Unit = {
51+
super.cleanUp()
52+
EmbeddedKafka.stop()
53+
}
4054
}

core/standalone/build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,9 +169,9 @@ dependencies {
169169
implementation project(':tools:admin')
170170
implementation "org.rogach:scallop_${gradle.scala.depVersion}:3.3.2"
171171

172-
implementation "io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.4.0"
172+
implementation "io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.8.1"
173173
constraints {
174-
implementation("io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.4.0")
174+
implementation("io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.8.1")
175175
implementation('org.apache.avro:avro:1.11.1') {
176176
because 'embeddedkafka dependency cannot be upgraded currently and avro in embedded kafka 2.4.0 has vulns'
177177
}

core/standalone/src/main/scala/org/apache/openwhisk/standalone/KafkaLauncher.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,9 @@
1818
package org.apache.openwhisk.standalone
1919

2020
import java.io.File
21-
2221
import akka.actor.ActorSystem
2322
import kafka.server.KafkaConfig
24-
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
23+
import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
2524
import org.apache.commons.io.FileUtils
2625
import org.apache.openwhisk.common.{Logging, TransactionId}
2726
import org.apache.openwhisk.core.WhiskConfig
@@ -30,6 +29,7 @@ import org.apache.openwhisk.core.entity.ControllerInstanceId
3029
import org.apache.openwhisk.core.loadBalancer.{LeanBalancer, LoadBalancer, LoadBalancerProvider}
3130
import org.apache.openwhisk.standalone.StandaloneDockerSupport.{checkOrAllocatePort, containerName, createRunCmd}
3231

32+
import java.nio.file.FileSystems
3333
import scala.concurrent.{ExecutionContext, Future}
3434
import scala.reflect.io.Directory
3535
import scala.util.Try
@@ -66,8 +66,10 @@ class KafkaLauncher(
6666
EmbeddedKafkaConfig(kafkaPort = kafkaPort, zooKeeperPort = zkPort, customBrokerProperties = brokerProps)
6767

6868
val t = Try {
69-
EmbeddedKafka.startZooKeeper(createDir("zookeeper"))
70-
EmbeddedKafka.startKafka(createDir("kafka"))
69+
createDir("zookeeper")
70+
createDir("kafka")
71+
EmbeddedKafka.startZooKeeper(FileSystems.getDefault.getPath(workDir.getPath,"zookeeper"))
72+
EmbeddedKafka.startKafka(FileSystems.getDefault.getPath(workDir.getPath,"kafka"))
7173
}
7274

7375
Future

settings.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ gradle.ext.scalafmt = [
7878
]
7979

8080
gradle.ext.akka = [version : '2.6.12']
81-
gradle.ext.akka_kafka = [version : '2.0.2']
81+
gradle.ext.akka_kafka = [version : '2.0.5']
8282
gradle.ext.akka_http = [version : '10.2.4']
8383
gradle.ext.akka_management = [version : '1.0.5']
8484

tests/build.gradle

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,9 +239,9 @@ dependencies {
239239
because 'swagger-request-validator-core cannot be upgraded to 2.x where vuln is remediated'
240240
}
241241
}
242-
implementation "io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.4.0"
242+
implementation "io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.8.1"
243243
constraints {
244-
implementation("io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.4.0")
244+
implementation("io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.8.1")
245245
implementation('org.apache.avro:avro:1.11.1') {
246246
because 'embeddedkafka dependency cannot be upgraded currently and avro in embedded kafka 2.4.0 has vulns'
247247
}
@@ -255,7 +255,6 @@ dependencies {
255255
implementation "com.microsoft.azure:azure-cosmos:3.7.6"
256256
implementation 'org.testcontainers:elasticsearch:1.17.6'
257257
implementation 'org.testcontainers:mongodb:1.17.1'
258-
259258
implementation project(':common:scala')
260259
implementation project(':core:controller')
261260
implementation project(':core:scheduler')

tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,17 @@
1717

1818
package org.apache.openwhisk.core.database.cosmosdb.cache
1919
import java.net.UnknownHostException
20-
2120
import akka.Done
2221
import akka.actor.CoordinatedShutdown
23-
import akka.kafka.testkit.scaladsl.{EmbeddedKafkaLike, ScalatestKafkaSpec}
22+
import akka.kafka.testkit.scaladsl.ScalatestKafkaSpec
2423
import com.typesafe.config.ConfigFactory
25-
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
24+
import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
2625
import org.apache.kafka.common.KafkaException
2726
import org.apache.kafka.common.serialization.StringDeserializer
2827
import org.apache.openwhisk.common.{AkkaLogging, TransactionId}
2928
import org.apache.openwhisk.core.database.{CacheInvalidationMessage, RemoteCacheInvalidation}
3029
import org.apache.openwhisk.core.database.cosmosdb.{CosmosDBArtifactStoreProvider, CosmosDBTestSupport}
31-
import org.apache.openwhisk.core.entity.{
32-
DocumentReader,
33-
EntityName,
34-
EntityPath,
35-
WhiskDocumentReader,
36-
WhiskEntity,
37-
WhiskEntityJsonFormat,
38-
WhiskPackage
39-
}
30+
import org.apache.openwhisk.core.entity.{DocumentReader, EntityName, EntityPath, WhiskDocumentReader, WhiskEntity, WhiskEntityJsonFormat, WhiskPackage}
4031
import org.junit.runner.RunWith
4132
import org.scalatest.concurrent.ScalaFutures
4233
import org.scalatest.junit.JUnitRunner
@@ -48,7 +39,6 @@ import scala.util.Random
4839
@RunWith(classOf[JUnitRunner])
4940
class CacheInvalidatorTests
5041
extends ScalatestKafkaSpec(6061)
51-
with EmbeddedKafkaLike
5242
with EmbeddedKafka
5343
with CosmosDBTestSupport
5444
with Matchers
@@ -58,7 +48,19 @@ class CacheInvalidatorTests
5848
private implicit val logging = new AkkaLogging(system.log)
5949
implicit override val patienceConfig: PatienceConfig = PatienceConfig(timeout = 300.seconds)
6050

61-
override def createKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort, zooKeeperPort)
51+
def createKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort, zooKeeperPort)
52+
53+
override def bootstrapServers = s"localhost:$kafkaPort"
54+
55+
override def setUp(): Unit = {
56+
EmbeddedKafka.start()(createKafkaConfig)
57+
super.setUp()
58+
}
59+
60+
override def cleanUp(): Unit = {
61+
super.cleanUp()
62+
EmbeddedKafka.stop()
63+
}
6264

6365
behavior of "CosmosDB CacheInvalidation"
6466

0 commit comments

Comments
 (0)