diff --git a/common/scala/build.gradle b/common/scala/build.gradle index fbbe74a6092..3c165e635d7 100644 --- a/common/scala/build.gradle +++ b/common/scala/build.gradle @@ -61,7 +61,7 @@ dependencies { api "commons-codec:commons-codec:1.9" api "commons-io:commons-io:2.11.0" api "commons-collections:commons-collections:3.2.2" - api "org.apache.kafka:kafka-clients:2.4.0" + api "org.apache.kafka:kafka-clients:2.8.2" api "org.apache.httpcomponents:httpclient:4.5.5" api "com.fasterxml.uuid:java-uuid-generator:3.1.3" api "com.github.ben-manes.caffeine:caffeine:2.6.2" diff --git a/core/monitoring/user-events/build.gradle b/core/monitoring/user-events/build.gradle index c6f4a9fc42e..42269d1f65f 100644 --- a/core/monitoring/user-events/build.gradle +++ b/core/monitoring/user-events/build.gradle @@ -45,9 +45,9 @@ dependencies { testImplementation "junit:junit:4.11" testImplementation "org.scalatest:scalatest_${gradle.scala.depVersion}:3.0.8" - testImplementation "io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.4.0" + testImplementation "io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.8.1" constraints { - testImplementation("io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.4.0") + testImplementation("io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.8.1") testImplementation('org.apache.avro:avro:1.11.1') { because 'embeddedkafka dependency cannot be upgraded currently and avro in embedded kafka 2.4.0 has vulns' } diff --git a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KafkaSpecBase.scala b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KafkaSpecBase.scala index 5ba7491914f..e9998b19f15 100644 --- a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KafkaSpecBase.scala +++ b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KafkaSpecBase.scala @@ -17,8 +17,8 @@ package org.apache.openwhisk.core.monitoring.metrics -import akka.kafka.testkit.scaladsl.{EmbeddedKafkaLike, ScalatestKafkaSpec} -import net.manub.embeddedkafka.EmbeddedKafka +import akka.kafka.testkit.scaladsl.ScalatestKafkaSpec +import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} import org.scalatest._ import org.scalatest.concurrent.{Eventually, IntegrationPatience, ScalaFutures} @@ -30,11 +30,25 @@ abstract class KafkaSpecBase with ScalaFutures with FlatSpecLike with EmbeddedKafka - with EmbeddedKafkaLike with IntegrationPatience with Eventually with EventsTestHelper { this: Suite => implicit val timeoutConfig: PatienceConfig = PatienceConfig(1.minute) override val sleepAfterProduce: FiniteDuration = 10.seconds override protected val topicCreationTimeout = 60.seconds + override protected val producerPublishTimeout: FiniteDuration = 60.seconds + + lazy implicit val embeddedKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort, zooKeeperPort) + + override def bootstrapServers = s"localhost:${embeddedKafkaConfig.kafkaPort}" + + override def setUp(): Unit = { + EmbeddedKafka.start()(embeddedKafkaConfig) + super.setUp() + } + + override def cleanUp(): Unit = { + super.cleanUp() + EmbeddedKafka.stop() + } } diff --git a/core/standalone/build.gradle b/core/standalone/build.gradle index 031e529d85a..46c526b9bee 100644 --- a/core/standalone/build.gradle +++ b/core/standalone/build.gradle @@ -169,9 +169,9 @@ dependencies { implementation project(':tools:admin') implementation "org.rogach:scallop_${gradle.scala.depVersion}:3.3.2" - implementation "io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.4.0" + implementation "io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.8.1" constraints { - implementation("io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.4.0") + implementation("io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.8.1") implementation('org.apache.avro:avro:1.11.1') { because 'embeddedkafka dependency cannot be upgraded currently and avro in embedded kafka 2.4.0 has vulns' } diff --git a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/KafkaLauncher.scala b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/KafkaLauncher.scala index 2c6790868e5..41e91e4a063 100644 --- a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/KafkaLauncher.scala +++ b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/KafkaLauncher.scala @@ -18,10 +18,9 @@ package org.apache.openwhisk.standalone import java.io.File - import akka.actor.ActorSystem import kafka.server.KafkaConfig -import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} +import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} import org.apache.commons.io.FileUtils import org.apache.openwhisk.common.{Logging, TransactionId} import org.apache.openwhisk.core.WhiskConfig @@ -30,6 +29,7 @@ import org.apache.openwhisk.core.entity.ControllerInstanceId import org.apache.openwhisk.core.loadBalancer.{LeanBalancer, LoadBalancer, LoadBalancerProvider} import org.apache.openwhisk.standalone.StandaloneDockerSupport.{checkOrAllocatePort, containerName, createRunCmd} +import java.nio.file.FileSystems import scala.concurrent.{ExecutionContext, Future} import scala.reflect.io.Directory import scala.util.Try @@ -66,8 +66,10 @@ class KafkaLauncher( EmbeddedKafkaConfig(kafkaPort = kafkaPort, zooKeeperPort = zkPort, customBrokerProperties = brokerProps) val t = Try { - EmbeddedKafka.startZooKeeper(createDir("zookeeper")) - EmbeddedKafka.startKafka(createDir("kafka")) + createDir("zookeeper") + createDir("kafka") + EmbeddedKafka.startZooKeeper(FileSystems.getDefault.getPath(workDir.getPath,"zookeeper")) + EmbeddedKafka.startKafka(FileSystems.getDefault.getPath(workDir.getPath,"kafka")) } Future diff --git a/settings.gradle b/settings.gradle index 0ba8ae6befd..9c9cca5e5fb 100644 --- a/settings.gradle +++ b/settings.gradle @@ -78,7 +78,7 @@ gradle.ext.scalafmt = [ ] gradle.ext.akka = [version : '2.6.12'] -gradle.ext.akka_kafka = [version : '2.0.2'] +gradle.ext.akka_kafka = [version : '2.0.5'] gradle.ext.akka_http = [version : '10.2.4'] gradle.ext.akka_management = [version : '1.0.5'] diff --git a/tests/build.gradle b/tests/build.gradle index fe0ef9eeb52..61e07b25848 100644 --- a/tests/build.gradle +++ b/tests/build.gradle @@ -239,9 +239,9 @@ dependencies { because 'swagger-request-validator-core cannot be upgraded to 2.x where vuln is remediated' } } - implementation "io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.4.0" + implementation "io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.8.1" constraints { - implementation("io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.4.0") + implementation("io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.8.1") implementation('org.apache.avro:avro:1.11.1') { because 'embeddedkafka dependency cannot be upgraded currently and avro in embedded kafka 2.4.0 has vulns' } @@ -255,7 +255,6 @@ dependencies { implementation "com.microsoft.azure:azure-cosmos:3.7.6" implementation 'org.testcontainers:elasticsearch:1.17.6' implementation 'org.testcontainers:mongodb:1.17.1' - implementation project(':common:scala') implementation project(':core:controller') implementation project(':core:scheduler') diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala index e4d252801b4..a40ed64df1f 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala @@ -17,26 +17,17 @@ package org.apache.openwhisk.core.database.cosmosdb.cache import java.net.UnknownHostException - import akka.Done import akka.actor.CoordinatedShutdown -import akka.kafka.testkit.scaladsl.{EmbeddedKafkaLike, ScalatestKafkaSpec} +import akka.kafka.testkit.scaladsl.ScalatestKafkaSpec import com.typesafe.config.ConfigFactory -import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} +import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} import org.apache.kafka.common.KafkaException import org.apache.kafka.common.serialization.StringDeserializer import org.apache.openwhisk.common.{AkkaLogging, TransactionId} import org.apache.openwhisk.core.database.{CacheInvalidationMessage, RemoteCacheInvalidation} import org.apache.openwhisk.core.database.cosmosdb.{CosmosDBArtifactStoreProvider, CosmosDBTestSupport} -import org.apache.openwhisk.core.entity.{ - DocumentReader, - EntityName, - EntityPath, - WhiskDocumentReader, - WhiskEntity, - WhiskEntityJsonFormat, - WhiskPackage -} +import org.apache.openwhisk.core.entity.{DocumentReader, EntityName, EntityPath, WhiskDocumentReader, WhiskEntity, WhiskEntityJsonFormat, WhiskPackage} import org.junit.runner.RunWith import org.scalatest.concurrent.ScalaFutures import org.scalatest.junit.JUnitRunner @@ -48,7 +39,6 @@ import scala.util.Random @RunWith(classOf[JUnitRunner]) class CacheInvalidatorTests extends ScalatestKafkaSpec(6061) - with EmbeddedKafkaLike with EmbeddedKafka with CosmosDBTestSupport with Matchers @@ -58,7 +48,19 @@ class CacheInvalidatorTests private implicit val logging = new AkkaLogging(system.log) implicit override val patienceConfig: PatienceConfig = PatienceConfig(timeout = 300.seconds) - override def createKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort, zooKeeperPort) + def createKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort, zooKeeperPort) + + override def bootstrapServers = s"localhost:$kafkaPort" + + override def setUp(): Unit = { + EmbeddedKafka.start()(createKafkaConfig) + super.setUp() + } + + override def cleanUp(): Unit = { + super.cleanUp() + EmbeddedKafka.stop() + } behavior of "CosmosDB CacheInvalidation"