Skip to content

Commit 291b3b2

Browse files
authored
Merge db7e4c8 into de3e0a8
2 parents de3e0a8 + db7e4c8 commit 291b3b2

File tree

3 files changed

+3
-2
lines changed

3 files changed

+3
-2
lines changed

common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class KafkaConsumerConnector(
5555

5656
// Currently consumed offset, is used to calculate the topic lag.
5757
// It is updated from one thread in "peek", no concurrent data structure is necessary
58+
// Note: Currently, this value used for metric reporting will not be accurate if using a multi-partition topic.
5859
private var offset: Long = 0
5960

6061
// Markers for metrics, initialized only once

common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ object KafkaMessagingProvider extends MessagingProvider {
6464

6565
Try(AdminClient.create(commonConfig + (AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> config.kafkaHosts)))
6666
.flatMap(client => {
67-
val partitions = 1
67+
val partitions = topicConfig.getOrElse("partitions", "1").toInt
6868
val nt = new NewTopic(topic, partitions, kafkaConfig.replicationFactor).configs(topicConfig.asJava)
6969

7070
def createTopic(retries: Int = 5): Try[Unit] = {

common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaProducerConnector.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class KafkaProducerConnector(
5151
/** Sends msg to topic. This is an asynchronous operation. */
5252
override def send(topic: String, msg: Message, retry: Int = 3): Future[ResultMetadata] = {
5353
implicit val transid: TransactionId = msg.transid
54-
val record = new ProducerRecord[String, String](topic, "messages", msg.serialize)
54+
val record = new ProducerRecord[String, String](topic, msg.serialize)
5555
val produced = Promise[ResultMetadata]()
5656

5757
Future {

0 commit comments

Comments
 (0)