Skip to content

Replace kafka.RecordMetadata with a common ResultMetadata #5217

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import pureconfig.generic.auto._
import org.apache.openwhisk.common.{Counter, Logging, TransactionId}
import org.apache.openwhisk.connector.kafka.KafkaConfiguration._
import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.connector.{Message, MessageProducer}
import org.apache.openwhisk.core.connector.{Message, MessageProducer, ResultMetadata}
import org.apache.openwhisk.core.entity.{ByteSize, UUIDs}
import org.apache.openwhisk.utils.Exceptions

Expand All @@ -49,17 +49,18 @@ class KafkaProducerConnector(
override def sentCount(): Long = sentCounter.cur

/** Sends msg to topic. This is an asynchronous operation. */
override def send(topic: String, msg: Message, retry: Int = 3): Future[RecordMetadata] = {
override def send(topic: String, msg: Message, retry: Int = 3): Future[ResultMetadata] = {
implicit val transid: TransactionId = msg.transid
val record = new ProducerRecord[String, String](topic, "messages", msg.serialize)
val produced = Promise[RecordMetadata]()
val produced = Promise[ResultMetadata]()

Future {
blocking {
try {
producer.send(record, new Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
if (exception == null) produced.trySuccess(metadata)
if (exception == null)
produced.trySuccess(ResultMetadata(metadata.topic(), metadata.partition(), metadata.offset()))
else produced.tryFailure(exception)
}
})
Expand All @@ -72,7 +73,7 @@ class KafkaProducerConnector(

produced.future.andThen {
case Success(status) =>
logging.debug(this, s"sent message: ${status.topic()}[${status.partition()}][${status.offset()}]")
logging.debug(this, s"sent message: ${status.topic}[${status.partition}][${status.offset}]")
sentCounter.next()
case Failure(t) =>
logging.error(this, s"sending message on topic '$topic' failed: ${t.getMessage}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@
package org.apache.openwhisk.connector.lean

import akka.actor.ActorSystem

import scala.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.TopicPartition
import org.apache.openwhisk.common.Counter
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.core.connector.Message
import org.apache.openwhisk.core.connector.MessageProducer
import org.apache.openwhisk.core.connector.{Message, MessageProducer, ResultMetadata}

import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
import scala.collection.mutable.Map
Expand All @@ -39,15 +37,15 @@ class LeanProducer(queues: Map[String, BlockingQueue[Array[Byte]]])(implicit log
override def sentCount(): Long = sentCounter.cur

/** Sends msg to topic. This is an asynchronous operation. */
override def send(topic: String, msg: Message, retry: Int = 3): Future[RecordMetadata] = {
override def send(topic: String, msg: Message, retry: Int = 3): Future[ResultMetadata] = {
implicit val transid = msg.transid

val queue = queues.getOrElseUpdate(topic, new LinkedBlockingQueue[Array[Byte]]())

Future {
queue.put(msg.serialize.getBytes(StandardCharsets.UTF_8))
sentCounter.next()
new RecordMetadata(new TopicPartition(topic, 0), -1, -1, System.currentTimeMillis(), null, -1, -1)
ResultMetadata(topic, 0, -1)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ trait Message {
override def toString = serialize
}

case class ResultMetadata(topic: String, partition: Int, offset: Long)

case class ActivationMessage(override val transid: TransactionId,
action: FullyQualifiedEntityName,
revision: DocRevision,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@ package org.apache.openwhisk.core.connector

import scala.concurrent.Future

import org.apache.kafka.clients.producer.RecordMetadata

trait MessageProducer {

/** Count of messages sent. */
def sentCount(): Long

/** Sends msg to topic. This is an asynchronous operation. */
def send(topic: String, msg: Message, retry: Int = 0): Future[RecordMetadata]
def send(topic: String, msg: Message, retry: Int = 0): Future[ResultMetadata]

/** Closes producer. */
def close(): Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import java.util.concurrent.atomic.LongAdder

import akka.actor.ActorSystem
import akka.event.Logging.InfoLevel
import org.apache.kafka.clients.producer.RecordMetadata
import pureconfig._
import pureconfig.generic.auto._
import org.apache.openwhisk.common.LoggingMarkers._
Expand Down Expand Up @@ -189,7 +188,7 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
/** 3. Send the activation to the invoker */
protected def sendActivationToInvoker(producer: MessageProducer,
msg: ActivationMessage,
invoker: InvokerInstanceId): Future[RecordMetadata] = {
invoker: InvokerInstanceId): Future[ResultMetadata] = {
implicit val transid: TransactionId = msg.transid

val topic = s"${Controller.topicPrefix}invoker${invoker.toInt}"
Expand All @@ -206,7 +205,7 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
transid.finished(
this,
start,
s"posted to ${status.topic()}[${status.partition()}][${status.offset()}]",
s"posted to ${status.topic}[${status.partition}][${status.offset}]",
logLevel = InfoLevel)
case Failure(_) => transid.failed(this, start, s"error on posting to topic $topic")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Cancellable, P
import akka.event.Logging.InfoLevel
import akka.pattern.ask
import akka.util.Timeout
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.connector._
Expand Down Expand Up @@ -250,7 +249,7 @@ class FPCPoolBalancer(config: WhiskConfig,
/** 3. Send the activation to the kafka */
private def sendActivationToKafka(producer: MessageProducer,
msg: ActivationMessage,
topic: String): Future[RecordMetadata] = {
topic: String): Future[ResultMetadata] = {
implicit val transid: TransactionId = msg.transid

MetricEmitter.emitCounterMetric(LoggingMarkers.LOADBALANCER_ACTIVATION_START)
Expand All @@ -261,7 +260,7 @@ class FPCPoolBalancer(config: WhiskConfig,
transid.finished(
this,
start,
s"posted to ${status.topic()}[${status.partition()}][${status.offset()}]",
s"posted to ${status.topic}[${status.partition}][${status.offset}]",
logLevel = InfoLevel)
case Failure(_) => transid.failed(this, start, s"error on posting to topic $topic")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,16 @@
package org.apache.openwhisk.core.loadBalancer
import akka.actor.ActorRef
import akka.actor.ActorRefFactory
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.openwhisk.core.connector.ActivationMessage
import org.apache.openwhisk.core.connector.MessageProducer
import org.apache.openwhisk.core.connector.MessagingProvider
import org.apache.openwhisk.core.connector.{ActivationMessage, MessageProducer, MessagingProvider, ResultMetadata}
import org.apache.openwhisk.core.entity.InvokerInstanceId

import scala.concurrent.Future

trait InvokerPoolFactory {
def createInvokerPool(
actorRefFactory: ActorRefFactory,
messagingProvider: MessagingProvider,
messagingProducer: MessageProducer,
sendActivationToInvoker: (MessageProducer, ActivationMessage, InvokerInstanceId) => Future[RecordMetadata],
sendActivationToInvoker: (MessageProducer, ActivationMessage, InvokerInstanceId) => Future[ResultMetadata],
monitor: Option[ActorRef]): ActorRef
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
import org.apache.kafka.clients.producer.RecordMetadata
import akka.actor.{Actor, ActorRef, ActorRefFactory, FSM, Props}
import akka.actor.FSM.CurrentState
import akka.actor.FSM.SubscribeTransitionCallBack
Expand Down Expand Up @@ -76,7 +75,7 @@ final case class InvokerInfo(buffer: RingBuffer[InvocationFinishedResult])
* by the InvokerPool and thus might not be caught by monitoring.
*/
class InvokerPool(childFactory: (ActorRefFactory, InvokerInstanceId) => ActorRef,
sendActivationToInvoker: (ActivationMessage, InvokerInstanceId) => Future[RecordMetadata],
sendActivationToInvoker: (ActivationMessage, InvokerInstanceId) => Future[ResultMetadata],
pingConsumer: MessageConsumer,
monitor: Option[ActorRef])
extends Actor {
Expand Down Expand Up @@ -230,7 +229,7 @@ object InvokerPool {
}

def props(f: (ActorRefFactory, InvokerInstanceId) => ActorRef,
p: (ActivationMessage, InvokerInstanceId) => Future[RecordMetadata],
p: (ActivationMessage, InvokerInstanceId) => Future[ResultMetadata],
pc: MessageConsumer,
m: Option[ActorRef] = None): Props = {
Props(new InvokerPool(f, p, pc, m))
Expand Down Expand Up @@ -273,7 +272,7 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr

// This is done at this point to not intermingle with the state-machine especially their timeouts.
def customReceive: Receive = {
case _: RecordMetadata => // Ignores the result of publishing test actions to MessageProducer.
case _: ResultMetadata => // Ignores the result of publishing test actions to MessageProducer.
}

override def receive: Receive = customReceive.orElse(super.receive)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import akka.cluster.ClusterEvent._
import akka.cluster.{Cluster, Member, MemberStatus}
import akka.management.scaladsl.AkkaManagement
import akka.management.cluster.bootstrap.ClusterBootstrap
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy, Unresponsive}
import pureconfig._
import pureconfig.generic.auto._
Expand Down Expand Up @@ -340,7 +339,7 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider {
actorRefFactory: ActorRefFactory,
messagingProvider: MessagingProvider,
messagingProducer: MessageProducer,
sendActivationToInvoker: (MessageProducer, ActivationMessage, InvokerInstanceId) => Future[RecordMetadata],
sendActivationToInvoker: (MessageProducer, ActivationMessage, InvokerInstanceId) => Future[ResultMetadata],
monitor: Option[ActorRef]): ActorRef = {

InvokerPool.prepare(instance, WhiskEntityStore.datastore())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
package org.apache.openwhisk.core.containerpool.v2

import java.util.concurrent.atomic.AtomicInteger

import akka.actor.{Actor, ActorRef, ActorRefFactory, Cancellable, Props}
import org.apache.kafka.clients.producer.RecordMetadata

import org.apache.openwhisk.common._
import org.apache.openwhisk.core.connector.ContainerCreationError._
import org.apache.openwhisk.core.connector.{
ContainerCreationAckMessage,
ContainerCreationMessage,
ContainerDeletionMessage
ContainerDeletionMessage,
ResultMetadata
}
import org.apache.openwhisk.core.containerpool.{
AdjustPrewarmedContainer,
Expand Down Expand Up @@ -81,7 +81,7 @@ class FunctionPullingContainerPool(
poolConfig: ContainerPoolConfig,
instance: InvokerInstanceId,
prewarmConfig: List[PrewarmingConfig] = List.empty,
sendAckToScheduler: (SchedulerInstanceId, ContainerCreationAckMessage) => Future[RecordMetadata])(
sendAckToScheduler: (SchedulerInstanceId, ContainerCreationAckMessage) => Future[ResultMetadata])(
implicit val logging: Logging)
extends Actor {
import ContainerPoolV2.memoryConsumptionOf
Expand Down Expand Up @@ -841,7 +841,7 @@ object ContainerPoolV2 {
poolConfig: ContainerPoolConfig,
instance: InvokerInstanceId,
prewarmConfig: List[PrewarmingConfig] = List.empty,
sendAckToScheduler: (SchedulerInstanceId, ContainerCreationAckMessage) => Future[RecordMetadata])(
sendAckToScheduler: (SchedulerInstanceId, ContainerCreationAckMessage) => Future[ResultMetadata])(
implicit logging: Logging): Props = {
Props(
new FunctionPullingContainerPool(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.openwhisk.core.invoker
import java.nio.charset.StandardCharsets

import akka.actor.{ActorRef, ActorSystem, Props}
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.openwhisk.common.{GracefulShutdown, Logging, TransactionId}
import org.apache.openwhisk.core.WarmUp.isWarmUpAction
import org.apache.openwhisk.core.WhiskConfig
Expand Down Expand Up @@ -48,7 +47,7 @@ class ContainerMessageConsumer(
msgProvider: MessagingProvider,
longPollDuration: FiniteDuration,
maxPeek: Int,
sendAckToScheduler: (SchedulerInstanceId, ContainerCreationAckMessage) => Future[RecordMetadata])(
sendAckToScheduler: (SchedulerInstanceId, ContainerCreationAckMessage) => Future[ResultMetadata])(
implicit actorSystem: ActorSystem,
executionContext: ExecutionContext,
logging: Logging) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import akka.http.scaladsl.server.Route
import com.ibm.etcd.api.Event.EventType
import com.ibm.etcd.client.kv.KvClient.Watch
import com.ibm.etcd.client.kv.WatchUpdate
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.ack.{ActiveAck, HealthActionAck, MessagingActiveAck, UserEventSender}
import org.apache.openwhisk.core.connector._
Expand Down Expand Up @@ -267,7 +266,7 @@ class FPCInvokerReactive(config: WhiskConfig,
}

private def sendAckToScheduler(schedulerInstanceId: SchedulerInstanceId,
creationAckMessage: ContainerCreationAckMessage): Future[RecordMetadata] = {
creationAckMessage: ContainerCreationAckMessage): Future[ResultMetadata] = {
val topic = s"${Invoker.topicPrefix}creationAck${schedulerInstanceId.asString}"
val reschedulable =
creationAckMessage.error.map(ContainerCreationError.whiskErrors.contains(_)).getOrElse(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import java.nio.charset.StandardCharsets
import java.util.concurrent.ThreadLocalRandom
import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
import akka.event.Logging.InfoLevel
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
import org.apache.openwhisk.common.{GracefulShutdown, InvokerHealth, Logging, LoggingMarkers, TransactionId}
import org.apache.openwhisk.core.connector.ContainerCreationError.{
Expand Down Expand Up @@ -275,7 +274,7 @@ class ContainerManager(jobManagerFactory: ActorRefFactory => ActorRef,

private def sendCreationContainerToInvoker(producer: MessageProducer,
invoker: Int,
msg: ContainerCreationMessage): Future[RecordMetadata] = {
msg: ContainerCreationMessage): Future[ResultMetadata] = {
implicit val transid: TransactionId = msg.transid

val topic = s"${Scheduler.topicPrefix}invoker$invoker"
Expand All @@ -286,8 +285,7 @@ class ContainerManager(jobManagerFactory: ActorRefFactory => ActorRef,
transid.finished(
this,
start,
s"posted creationId: ${msg.creationId} for ${msg.invocationNamespace}/${msg.action} to ${status
.topic()}[${status.partition()}][${status.offset()}]",
s"posted creationId: ${msg.creationId} for ${msg.invocationNamespace}/${msg.action} to ${status.topic}[${status.partition}][${status.offset}]",
logLevel = InfoLevel)
case Failure(_) =>
logging.error(this, s"Failed to create container for ${msg.action}, error: error on posting to topic $topic")
Expand All @@ -297,7 +295,7 @@ class ContainerManager(jobManagerFactory: ActorRefFactory => ActorRef,

private def sendDeletionContainerToInvoker(producer: MessageProducer,
invoker: Int,
msg: ContainerDeletionMessage): Future[RecordMetadata] = {
msg: ContainerDeletionMessage): Future[ResultMetadata] = {
implicit val transid: TransactionId = msg.transid

val topic = s"${Scheduler.topicPrefix}invoker$invoker"
Expand All @@ -308,8 +306,7 @@ class ContainerManager(jobManagerFactory: ActorRefFactory => ActorRef,
transid.finished(
this,
start,
s"posted deletion for ${msg.invocationNamespace}/${msg.action} to ${status
.topic()}[${status.partition()}][${status.offset()}]",
s"posted deletion for ${msg.invocationNamespace}/${msg.action} to ${status.topic}[${status.partition}][${status.offset}]",
logLevel = InfoLevel)
case Failure(_) =>
logging.error(this, s"Failed to delete container for ${msg.action}, error: error on posting to topic $topic")
Expand Down
Loading