Skip to content

Commit eb2e900

Browse files
committed
Add prefix for topics
- Add prefix for topics - Add extra prefix for userEvent topic only
1 parent 1753946 commit eb2e900

File tree

16 files changed

+66
-22
lines changed

16 files changed

+66
-22
lines changed

ansible/group_vars/all

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,8 @@ registry:
138138
confdir: "{{ config_root_dir }}/registry"
139139

140140
kafka:
141+
topicsPrefix: "{{ kafka_topics_prefix | default('') }}"
142+
topicsUserEventPrefix: "{{ kafka_topics_userEvent_prefix | default(kafka_topics_prefix) | default('') }}"
141143
ssl:
142144
client_authentication: required
143145
keystore:

ansible/roles/controller/tasks/deploy.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,10 @@
176176
"{{ kafka_topics_health_retentionMS | default() }}"
177177
"CONFIG_whisk_kafka_topics_health_segmentBytes":
178178
"{{ kafka_topics_health_segmentBytes | default() }}"
179+
"CONFIG_whisk_kafka_topics_prefix":
180+
"{{ kafka.topicsPrefix }}"
181+
"CONFIG_whisk_kafka_topics_userEvent_prefix":
182+
"{{ kafka.topicsUserEventPrefix }}"
179183
"CONFIG_whisk_kafka_common_securityProtocol":
180184
"{{ kafka.protocol }}"
181185
"CONFIG_whisk_kafka_common_sslTruststoreLocation":

ansible/roles/invoker/tasks/deploy.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,8 @@
216216
"CONFIG_whisk_kafka_topics_invoker_retentionBytes": "{{ kafka_topics_invoker_retentionBytes | default() }}"
217217
"CONFIG_whisk_kafka_topics_invoker_retentionMs": "{{ kafka_topics_invoker_retentionMS | default() }}"
218218
"CONFIG_whisk_kakfa_topics_invoker_segmentBytes": "{{ kafka_topics_invoker_segmentBytes | default() }}"
219+
"CONFIG_whisk_kafka_topics_prefix": "{{ kafka.topicsPrefix }}"
220+
"CONFIG_whisk_kafka_topics_userEvent_prefix": "{{ kafka.topicsUserEventPrefix }}"
219221
"CONFIG_whisk_kafka_common_securityProtocol": "{{ kafka.protocol }}"
220222
"CONFIG_whisk_kafka_common_sslTruststoreLocation": "/conf/{{ kafka.ssl.keystore.name }}"
221223
"CONFIG_whisk_kafka_common_sslTruststorePassword": "{{ kafka.ssl.keystore.password }}"

common/scala/src/main/resources/application.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,10 @@ whisk {
193193
retention-bytes = 1073741824
194194
retention-ms = 3600000
195195
}
196+
prefix = ""
197+
user-event {
198+
prefix = ""
199+
}
196200
}
197201

198202
metrics {

common/scala/src/main/scala/org/apache/openwhisk/common/UserEvents.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@ object UserEvents {
2828

2929
val enabled = loadConfigOrThrow[UserEventsConfig](ConfigKeys.userEvents).enabled
3030

31+
val userEventTopicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsUserEventPrefix)
32+
3133
def send(producer: MessageProducer, em: => EventMessage) = {
3234
if (enabled) {
33-
producer.send("events", em)
35+
producer.send(userEventTopicPrefix + "events", em)
3436
}
3537
}
3638
}

common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,8 @@ object ConfigKeys {
212212
val kafkaProducer = s"$kafka.producer"
213213
val kafkaConsumer = s"$kafka.consumer"
214214
val kafkaTopics = s"$kafka.topics"
215+
val kafkaTopicsPrefix = s"$kafkaTopics.prefix"
216+
val kafkaTopicsUserEventPrefix = s"$kafkaTopics.user-event.prefix"
215217

216218
val memory = "whisk.memory"
217219
val timeLimit = "whisk.time-limit"

common/scala/src/main/scala/org/apache/openwhisk/core/ack/MessagingActiveAck.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,20 @@ package org.apache.openwhisk.core.ack
1919

2020
import org.apache.kafka.common.errors.RecordTooLargeException
2121
import org.apache.openwhisk.common.{Logging, TransactionId}
22+
import org.apache.openwhisk.core.ConfigKeys
2223
import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, EventMessage, MessageProducer}
2324
import org.apache.openwhisk.core.entity._
24-
25+
import pureconfig._
2526
import scala.concurrent.{ExecutionContext, Future}
2627
import scala.util.{Failure, Success}
2728

2829
class MessagingActiveAck(producer: MessageProducer, instance: InstanceId, eventSender: Option[EventSender])(
2930
implicit logging: Logging,
3031
ec: ExecutionContext)
3132
extends ActiveAck {
33+
34+
private val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)
35+
3236
override def apply(tid: TransactionId,
3337
activationResult: WhiskActivation,
3438
blockingInvoke: Boolean,
@@ -38,7 +42,7 @@ class MessagingActiveAck(producer: MessageProducer, instance: InstanceId, eventS
3842
implicit val transid: TransactionId = tid
3943

4044
def send(msg: AcknowledegmentMessage, recovery: Boolean = false) = {
41-
producer.send(topic = "completed" + controllerInstance.asString, msg).andThen {
45+
producer.send(topic = topicPrefix + "completed" + controllerInstance.asString, msg).andThen {
4246
case Success(_) =>
4347
val info = if (recovery) s"recovery ${msg.messageType}" else msg.messageType
4448
logging.info(this, s"posted $info of activation ${acknowledegment.activationId}")

common/scala/src/main/scala/org/apache/openwhisk/core/database/RemoteCacheInvalidation.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,11 @@ import scala.concurrent.duration.DurationInt
2424
import scala.util.Failure
2525
import scala.util.Success
2626
import scala.util.Try
27-
2827
import akka.actor.ActorSystem
2928
import akka.actor.Props
3029
import spray.json._
3130
import org.apache.openwhisk.common.Logging
32-
import org.apache.openwhisk.core.WhiskConfig
31+
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
3332
import org.apache.openwhisk.core.connector.Message
3433
import org.apache.openwhisk.core.connector.MessageFeed
3534
import org.apache.openwhisk.core.connector.MessagingProvider
@@ -41,6 +40,7 @@ import org.apache.openwhisk.core.entity.WhiskPackage
4140
import org.apache.openwhisk.core.entity.WhiskRule
4241
import org.apache.openwhisk.core.entity.WhiskTrigger
4342
import org.apache.openwhisk.spi.SpiLoader
43+
import pureconfig._
4444

4545
case class CacheInvalidationMessage(key: CacheKey, instanceId: String) extends Message {
4646
override def serialize = CacheInvalidationMessage.serdes.write(this).compactPrint
@@ -101,5 +101,6 @@ class RemoteCacheInvalidation(config: WhiskConfig, component: String, instance:
101101
}
102102

103103
object RemoteCacheInvalidation {
104-
val cacheInvalidationTopic = "cacheInvalidation"
104+
val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)
105+
val cacheInvalidationTopic = topicPrefix + "cacheInvalidation"
105106
}

core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import spray.json.DefaultJsonProtocol._
3232
import spray.json._
3333
import org.apache.openwhisk.common.Https.HttpsConfig
3434
import org.apache.openwhisk.common.{AkkaLogging, ConfigMXBean, Logging, LoggingMarkers, TransactionId}
35-
import org.apache.openwhisk.core.WhiskConfig
35+
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
3636
import org.apache.openwhisk.core.connector.MessagingProvider
3737
import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
3838
import org.apache.openwhisk.core.database.{ActivationStoreProvider, CacheChangeNotification, RemoteCacheInvalidation}
@@ -187,6 +187,9 @@ object Controller {
187187
protected val interface = loadConfigOrThrow[String]("whisk.controller.interface")
188188
protected val readinessThreshold = loadConfig[Double]("whisk.controller.readiness-fraction")
189189

190+
val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)
191+
val userEventTopicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsUserEventPrefix)
192+
190193
// requiredProperties is a Map whose keys define properties that must be bound to
191194
// a value, and whose values are default values. A null value in the Map means there is
192195
// no default value specified, so it must appear in the properties file
@@ -263,10 +266,10 @@ object Controller {
263266
val msgProvider = SpiLoader.get[MessagingProvider]
264267

265268
Seq(
266-
("completed" + instance.asString, "completed", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)),
267-
("health", "health", None),
268-
("cacheInvalidation", "cache-invalidation", None),
269-
("events", "events", None)).foreach {
269+
(topicPrefix + "completed" + instance.asString, "completed", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)),
270+
(topicPrefix + "health", "health", None),
271+
(topicPrefix + "cacheInvalidation", "cache-invalidation", None),
272+
(userEventTopicPrefix + "events", "events", None)).foreach {
270273
case (topic, topicConfigurationKey, maxMessageBytes) =>
271274
if (msgProvider.ensureTopic(config, topic, topicConfigurationKey, maxMessageBytes).isFailure) {
272275
abort(s"failure during msgProvider.ensureTopic for topic $topic")

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import pureconfig.generic.auto._
3030
import org.apache.openwhisk.common.LoggingMarkers._
3131
import org.apache.openwhisk.common._
3232
import org.apache.openwhisk.core.connector._
33+
import org.apache.openwhisk.core.controller.Controller
3334
import org.apache.openwhisk.core.entity._
3435
import org.apache.openwhisk.core.entity.size._
3536
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
@@ -177,7 +178,7 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
177178
invoker: InvokerInstanceId): Future[RecordMetadata] = {
178179
implicit val transid: TransactionId = msg.transid
179180

180-
val topic = s"invoker${invoker.toInt}"
181+
val topic = s"${Controller.topicPrefix}invoker${invoker.toInt}"
181182

182183
MetricEmitter.emitCounterMetric(LoggingMarkers.LOADBALANCER_ACTIVATION_START)
183184
val start = transid.started(

0 commit comments

Comments
 (0)