Skip to content

Commit 1c770ae

Browse files
Replace kafka.RecordMetadata with a common ResultMetadata
1 parent 3e3414c commit 1c770ae

File tree

20 files changed

+76
-97
lines changed

20 files changed

+76
-97
lines changed

common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaProducerConnector.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import pureconfig.generic.auto._
2727
import org.apache.openwhisk.common.{Counter, Logging, TransactionId}
2828
import org.apache.openwhisk.connector.kafka.KafkaConfiguration._
2929
import org.apache.openwhisk.core.ConfigKeys
30-
import org.apache.openwhisk.core.connector.{Message, MessageProducer}
30+
import org.apache.openwhisk.core.connector.{Message, MessageProducer, ResultMetadata}
3131
import org.apache.openwhisk.core.entity.{ByteSize, UUIDs}
3232
import org.apache.openwhisk.utils.Exceptions
3333

@@ -49,17 +49,18 @@ class KafkaProducerConnector(
4949
override def sentCount(): Long = sentCounter.cur
5050

5151
/** Sends msg to topic. This is an asynchronous operation. */
52-
override def send(topic: String, msg: Message, retry: Int = 3): Future[RecordMetadata] = {
52+
override def send(topic: String, msg: Message, retry: Int = 3): Future[ResultMetadata] = {
5353
implicit val transid: TransactionId = msg.transid
5454
val record = new ProducerRecord[String, String](topic, "messages", msg.serialize)
55-
val produced = Promise[RecordMetadata]()
55+
val produced = Promise[ResultMetadata]()
5656

5757
Future {
5858
blocking {
5959
try {
6060
producer.send(record, new Callback {
6161
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
62-
if (exception == null) produced.trySuccess(metadata)
62+
if (exception == null)
63+
produced.trySuccess(ResultMetadata(metadata.topic(), metadata.partition(), metadata.offset()))
6364
else produced.tryFailure(exception)
6465
}
6566
})
@@ -72,7 +73,7 @@ class KafkaProducerConnector(
7273

7374
produced.future.andThen {
7475
case Success(status) =>
75-
logging.debug(this, s"sent message: ${status.topic()}[${status.partition()}][${status.offset()}]")
76+
logging.debug(this, s"sent message: ${status.topic}[${status.partition}][${status.offset}]")
7677
sentCounter.next()
7778
case Failure(t) =>
7879
logging.error(this, s"sending message on topic '$topic' failed: ${t.getMessage}")

common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanProducer.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,11 @@
1818
package org.apache.openwhisk.connector.lean
1919

2020
import akka.actor.ActorSystem
21+
2122
import scala.concurrent.Future
22-
import org.apache.kafka.clients.producer.RecordMetadata
23-
import org.apache.kafka.common.TopicPartition
2423
import org.apache.openwhisk.common.Counter
2524
import org.apache.openwhisk.common.Logging
26-
import org.apache.openwhisk.core.connector.Message
27-
import org.apache.openwhisk.core.connector.MessageProducer
25+
import org.apache.openwhisk.core.connector.{Message, MessageProducer, ResultMetadata}
2826

2927
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
3028
import scala.collection.mutable.Map
@@ -39,15 +37,15 @@ class LeanProducer(queues: Map[String, BlockingQueue[Array[Byte]]])(implicit log
3937
override def sentCount(): Long = sentCounter.cur
4038

4139
/** Sends msg to topic. This is an asynchronous operation. */
42-
override def send(topic: String, msg: Message, retry: Int = 3): Future[RecordMetadata] = {
40+
override def send(topic: String, msg: Message, retry: Int = 3): Future[ResultMetadata] = {
4341
implicit val transid = msg.transid
4442

4543
val queue = queues.getOrElseUpdate(topic, new LinkedBlockingQueue[Array[Byte]]())
4644

4745
Future {
4846
queue.put(msg.serialize.getBytes(StandardCharsets.UTF_8))
4947
sentCounter.next()
50-
new RecordMetadata(new TopicPartition(topic, 0), -1, -1, System.currentTimeMillis(), null, -1, -1)
48+
ResultMetadata(topic, 0, -1)
5149
}
5250
}
5351

common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ trait Message {
4848
override def toString = serialize
4949
}
5050

51+
case class ResultMetadata(topic: String, partition: Int, offset: Long)
52+
5153
case class ActivationMessage(override val transid: TransactionId,
5254
action: FullyQualifiedEntityName,
5355
revision: DocRevision,

common/scala/src/main/scala/org/apache/openwhisk/core/connector/MessageProducer.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,13 @@ package org.apache.openwhisk.core.connector
1919

2020
import scala.concurrent.Future
2121

22-
import org.apache.kafka.clients.producer.RecordMetadata
23-
2422
trait MessageProducer {
2523

2624
/** Count of messages sent. */
2725
def sentCount(): Long
2826

2927
/** Sends msg to topic. This is an asynchronous operation. */
30-
def send(topic: String, msg: Message, retry: Int = 0): Future[RecordMetadata]
28+
def send(topic: String, msg: Message, retry: Int = 0): Future[ResultMetadata]
3129

3230
/** Closes producer. */
3331
def close(): Unit

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import java.util.concurrent.atomic.LongAdder
2323

2424
import akka.actor.ActorSystem
2525
import akka.event.Logging.InfoLevel
26-
import org.apache.kafka.clients.producer.RecordMetadata
2726
import pureconfig._
2827
import pureconfig.generic.auto._
2928
import org.apache.openwhisk.common.LoggingMarkers._
@@ -189,7 +188,7 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
189188
/** 3. Send the activation to the invoker */
190189
protected def sendActivationToInvoker(producer: MessageProducer,
191190
msg: ActivationMessage,
192-
invoker: InvokerInstanceId): Future[RecordMetadata] = {
191+
invoker: InvokerInstanceId): Future[ResultMetadata] = {
193192
implicit val transid: TransactionId = msg.transid
194193

195194
val topic = s"${Controller.topicPrefix}invoker${invoker.toInt}"
@@ -206,7 +205,7 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
206205
transid.finished(
207206
this,
208207
start,
209-
s"posted to ${status.topic()}[${status.partition()}][${status.offset()}]",
208+
s"posted to ${status.topic}[${status.partition}][${status.offset}]",
210209
logLevel = InfoLevel)
211210
case Failure(_) => transid.failed(this, start, s"error on posting to topic $topic")
212211
}

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Cancellable, P
88
import akka.event.Logging.InfoLevel
99
import akka.pattern.ask
1010
import akka.util.Timeout
11-
import org.apache.kafka.clients.producer.RecordMetadata
1211
import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
1312
import org.apache.openwhisk.common._
1413
import org.apache.openwhisk.core.connector._
@@ -250,7 +249,7 @@ class FPCPoolBalancer(config: WhiskConfig,
250249
/** 3. Send the activation to the kafka */
251250
private def sendActivationToKafka(producer: MessageProducer,
252251
msg: ActivationMessage,
253-
topic: String): Future[RecordMetadata] = {
252+
topic: String): Future[ResultMetadata] = {
254253
implicit val transid: TransactionId = msg.transid
255254

256255
MetricEmitter.emitCounterMetric(LoggingMarkers.LOADBALANCER_ACTIVATION_START)
@@ -261,7 +260,7 @@ class FPCPoolBalancer(config: WhiskConfig,
261260
transid.finished(
262261
this,
263262
start,
264-
s"posted to ${status.topic()}[${status.partition()}][${status.offset()}]",
263+
s"posted to ${status.topic}[${status.partition}][${status.offset}]",
265264
logLevel = InfoLevel)
266265
case Failure(_) => transid.failed(this, start, s"error on posting to topic $topic")
267266
}

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerPoolFactory.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,16 @@
1818
package org.apache.openwhisk.core.loadBalancer
1919
import akka.actor.ActorRef
2020
import akka.actor.ActorRefFactory
21-
import org.apache.kafka.clients.producer.RecordMetadata
22-
import org.apache.openwhisk.core.connector.ActivationMessage
23-
import org.apache.openwhisk.core.connector.MessageProducer
24-
import org.apache.openwhisk.core.connector.MessagingProvider
21+
import org.apache.openwhisk.core.connector.{ActivationMessage, MessageProducer, MessagingProvider, ResultMetadata}
2522
import org.apache.openwhisk.core.entity.InvokerInstanceId
23+
2624
import scala.concurrent.Future
2725

2826
trait InvokerPoolFactory {
2927
def createInvokerPool(
3028
actorRefFactory: ActorRefFactory,
3129
messagingProvider: MessagingProvider,
3230
messagingProducer: MessageProducer,
33-
sendActivationToInvoker: (MessageProducer, ActivationMessage, InvokerInstanceId) => Future[RecordMetadata],
31+
sendActivationToInvoker: (MessageProducer, ActivationMessage, InvokerInstanceId) => Future[ResultMetadata],
3432
monitor: Option[ActorRef]): ActorRef
3533
}

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import scala.concurrent.{Await, ExecutionContext, Future}
2424
import scala.concurrent.duration._
2525
import scala.util.Failure
2626
import scala.util.Success
27-
import org.apache.kafka.clients.producer.RecordMetadata
2827
import akka.actor.{Actor, ActorRef, ActorRefFactory, FSM, Props}
2928
import akka.actor.FSM.CurrentState
3029
import akka.actor.FSM.SubscribeTransitionCallBack
@@ -76,7 +75,7 @@ final case class InvokerInfo(buffer: RingBuffer[InvocationFinishedResult])
7675
* by the InvokerPool and thus might not be caught by monitoring.
7776
*/
7877
class InvokerPool(childFactory: (ActorRefFactory, InvokerInstanceId) => ActorRef,
79-
sendActivationToInvoker: (ActivationMessage, InvokerInstanceId) => Future[RecordMetadata],
78+
sendActivationToInvoker: (ActivationMessage, InvokerInstanceId) => Future[ResultMetadata],
8079
pingConsumer: MessageConsumer,
8180
monitor: Option[ActorRef])
8281
extends Actor {
@@ -230,7 +229,7 @@ object InvokerPool {
230229
}
231230

232231
def props(f: (ActorRefFactory, InvokerInstanceId) => ActorRef,
233-
p: (ActivationMessage, InvokerInstanceId) => Future[RecordMetadata],
232+
p: (ActivationMessage, InvokerInstanceId) => Future[ResultMetadata],
234233
pc: MessageConsumer,
235234
m: Option[ActorRef] = None): Props = {
236235
Props(new InvokerPool(f, p, pc, m))
@@ -273,7 +272,7 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
273272

274273
// This is done at this point to not intermingle with the state-machine especially their timeouts.
275274
def customReceive: Receive = {
276-
case _: RecordMetadata => // Ignores the result of publishing test actions to MessageProducer.
275+
case _: ResultMetadata => // Ignores the result of publishing test actions to MessageProducer.
277276
}
278277

279278
override def receive: Receive = customReceive.orElse(super.receive)

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import akka.cluster.ClusterEvent._
2626
import akka.cluster.{Cluster, Member, MemberStatus}
2727
import akka.management.scaladsl.AkkaManagement
2828
import akka.management.cluster.bootstrap.ClusterBootstrap
29-
import org.apache.kafka.clients.producer.RecordMetadata
3029
import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy, Unresponsive}
3130
import pureconfig._
3231
import pureconfig.generic.auto._
@@ -340,7 +339,7 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider {
340339
actorRefFactory: ActorRefFactory,
341340
messagingProvider: MessagingProvider,
342341
messagingProducer: MessageProducer,
343-
sendActivationToInvoker: (MessageProducer, ActivationMessage, InvokerInstanceId) => Future[RecordMetadata],
342+
sendActivationToInvoker: (MessageProducer, ActivationMessage, InvokerInstanceId) => Future[ResultMetadata],
344343
monitor: Option[ActorRef]): ActorRef = {
345344

346345
InvokerPool.prepare(instance, WhiskEntityStore.datastore())

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@
1818
package org.apache.openwhisk.core.containerpool.v2
1919

2020
import java.util.concurrent.atomic.AtomicInteger
21-
2221
import akka.actor.{Actor, ActorRef, ActorRefFactory, Cancellable, Props}
23-
import org.apache.kafka.clients.producer.RecordMetadata
22+
2423
import org.apache.openwhisk.common._
2524
import org.apache.openwhisk.core.connector.ContainerCreationError._
2625
import org.apache.openwhisk.core.connector.{
2726
ContainerCreationAckMessage,
2827
ContainerCreationMessage,
29-
ContainerDeletionMessage
28+
ContainerDeletionMessage,
29+
ResultMetadata
3030
}
3131
import org.apache.openwhisk.core.containerpool.{
3232
AdjustPrewarmedContainer,
@@ -81,7 +81,7 @@ class FunctionPullingContainerPool(
8181
poolConfig: ContainerPoolConfig,
8282
instance: InvokerInstanceId,
8383
prewarmConfig: List[PrewarmingConfig] = List.empty,
84-
sendAckToScheduler: (SchedulerInstanceId, ContainerCreationAckMessage) => Future[RecordMetadata])(
84+
sendAckToScheduler: (SchedulerInstanceId, ContainerCreationAckMessage) => Future[ResultMetadata])(
8585
implicit val logging: Logging)
8686
extends Actor {
8787
import ContainerPoolV2.memoryConsumptionOf
@@ -841,7 +841,7 @@ object ContainerPoolV2 {
841841
poolConfig: ContainerPoolConfig,
842842
instance: InvokerInstanceId,
843843
prewarmConfig: List[PrewarmingConfig] = List.empty,
844-
sendAckToScheduler: (SchedulerInstanceId, ContainerCreationAckMessage) => Future[RecordMetadata])(
844+
sendAckToScheduler: (SchedulerInstanceId, ContainerCreationAckMessage) => Future[ResultMetadata])(
845845
implicit logging: Logging): Props = {
846846
Props(
847847
new FunctionPullingContainerPool(

0 commit comments

Comments
 (0)