Skip to content

Clean Up Etcd Worker Actor #5323

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package org.apache.openwhisk.core.etcd

import akka.actor.{Actor, ActorRef, ActorSystem, Props, Timers}
import io.grpc.StatusRuntimeException
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.core.etcd.EtcdWorker.GetLeaseAndRetry
import org.apache.openwhisk.core.service.DataManagementService.retryInterval
import org.apache.openwhisk.core.service.{
AlreadyExist,
Done,
ElectLeader,
ElectionResult,
FinishWork,
GetLease,
InitialDataStorageResults,
Lease,
RegisterData,
RegisterInitialData,
WatcherClosed
}

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
import scala.util.Success

class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec: ExecutionContext,
actorSystem: ActorSystem,
logging: Logging)
extends Actor
with Timers {

private val dataManagementService = context.parent
private var lease: Option[Lease] = None
leaseService ! GetLease

override def receive: Receive = {
case msg: Lease =>
lease = Some(msg)
case msg: GetLeaseAndRetry =>
logging.warn(this, msg.log)
if (!msg.skipLeaseRefresh) {
if (msg.clearLease) {
lease = None
}
leaseService ! GetLease
}
sendMessageToSelfAfter(msg.request, retryInterval)
// leader election + endpoint management
case request: ElectLeader =>
lease match {
case Some(l) =>
etcdClient
.electLeader(request.key, request.value, l)
.andThen {
case Success(msg) =>
request.recipient ! ElectionResult(msg)
dataManagementService ! FinishWork(request.key)
}
.recover {
// if there is no lease, reissue it and retry immediately
case t: StatusRuntimeException =>
self ! GetLeaseAndRetry(request, s"a lease is expired while leader election, reissue it: $t")
// it should retry forever until the data is stored
case t: Throwable =>
self ! GetLeaseAndRetry(
request,
s"unexpected error happened: $t, retry storing data",
skipLeaseRefresh = true)
}
case None =>
self ! GetLeaseAndRetry(request, s"lease not found, retry storing data ${request.key}", clearLease = false)
}

// only endpoint management
case request: RegisterData =>
lease match {
case Some(l) =>
etcdClient
.put(request.key, request.value, l.id)
.andThen {
case Success(_) =>
dataManagementService ! FinishWork(request.key)
}
.recover {
// if there is no lease, reissue it and retry immediately
case t: StatusRuntimeException =>
self ! GetLeaseAndRetry(
request,
s"a lease is expired while registering data ${request.key}, reissue it: $t")
// it should retry forever until the data is stored
case t: Throwable =>
self ! GetLeaseAndRetry(
request,
s"unexpected error happened: $t, retry storing data ${request.key}",
skipLeaseRefresh = true)
}
case None =>
self ! GetLeaseAndRetry(request, s"lease not found, retry storing data ${request.key}", clearLease = false)
}
// it stores the data iif there is no such one
case request: RegisterInitialData =>
lease match {
case Some(l) =>
etcdClient
.putTxn(request.key, request.value, 0, l.id)
.map { res =>
dataManagementService ! FinishWork(request.key)
if (res.getSucceeded) {
logging.info(this, s"initial data storing succeeds for ${request.key}")
request.recipient.map(_ ! InitialDataStorageResults(request.key, Right(Done())))
} else {
logging.info(this, s"data is already stored for: $request, cancel the initial data storing")
request.recipient.map(_ ! InitialDataStorageResults(request.key, Left(AlreadyExist())))
}
}
.recover {
// if there is no lease, reissue it and retry immediately
case t: StatusRuntimeException =>
self ! GetLeaseAndRetry(
request,
s"a lease is expired while registering an initial data ${request.key}, reissue it: $t")
// it should retry forever until the data is stored
case t: Throwable =>
self ! GetLeaseAndRetry(
request,
s"unexpected error happened: $t, retry storing data ${request.key}",
skipLeaseRefresh = true)
}
case None =>
self ! GetLeaseAndRetry(request, s"lease not found, retry storing data ${request.key}", clearLease = false)
}

case msg: WatcherClosed =>
etcdClient
.del(msg.key)
.andThen {
case Success(_) =>
dataManagementService ! FinishWork(msg.key)
}
.recover {
// if there is no lease, reissue it and retry immediately
case t: StatusRuntimeException =>
self ! GetLeaseAndRetry(msg, s"a lease is expired while deleting data ${msg.key}, reissue it: $t")
// it should retry forever until the data is stored
case t: Throwable =>
self ! GetLeaseAndRetry(
msg,
s"unexpected error happened: $t, retry storing data for ${msg.key}",
skipLeaseRefresh = true)
}
}

private def sendMessageToSelfAfter(msg: Any, retryInterval: FiniteDuration) = {
timers.startSingleTimer(msg, msg, retryInterval)
}
}

object EtcdWorker {
case class GetLeaseAndRetry(request: Any, log: String, clearLease: Boolean = true, skipLeaseRefresh: Boolean = false)

def props(etcdClient: EtcdClient, leaseService: ActorRef)(implicit ec: ExecutionContext,
actorSystem: ActorSystem,
logging: Logging): Props = {
Props(new EtcdWorker(etcdClient, leaseService))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,14 @@ package org.apache.openwhisk.core.service

import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
import akka.util.Timeout
import io.grpc.StatusRuntimeException
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
import org.apache.openwhisk.core.service.DataManagementService.retryInterval
import org.apache.openwhisk.core.etcd.{EtcdFollower, EtcdLeader}
import pureconfig.loadConfigOrThrow

import scala.collection.concurrent.TrieMap
import scala.collection.mutable.{Map, Queue}
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.Success

// messages received by the actor
// it is required to specify a recipient directly for the retryable message processing
Expand Down Expand Up @@ -181,148 +177,3 @@ object DataManagementService {
Props(new DataManagementService(watcherService, workerFactory))
}
}

private[service] class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec: ExecutionContext,
actorSystem: ActorSystem,
logging: Logging)
extends Actor {

private val dataManagementService = context.parent
private var lease: Option[Lease] = None
leaseService ! GetLease

override def receive: Receive = {
case msg: Lease =>
lease = Some(msg)

// leader election + endpoint management
case request: ElectLeader =>
lease match {
case Some(l) =>
etcdClient
.electLeader(request.key, request.value, l)
.andThen {
case Success(msg) =>
request.recipient ! ElectionResult(msg)
dataManagementService ! FinishWork(request.key)
}
.recover {
// if there is no lease, reissue it and retry immediately
case t: StatusRuntimeException =>
logging.warn(this, s"a lease is expired while leader election, reissue it: $t")
lease = None
leaseService ! GetLease
sendMessageToSelfAfter(request, retryInterval)

// it should retry forever until the data is stored
case t: Throwable =>
logging.warn(this, s"unexpected error happened: $t, retry storing data")
sendMessageToSelfAfter(request, retryInterval)
}
case None =>
logging.warn(this, s"lease not found, retry storing data")
leaseService ! GetLease
sendMessageToSelfAfter(request, retryInterval)
}

// only endpoint management
case request: RegisterData =>
lease match {
case Some(l) =>
etcdClient
.put(request.key, request.value, l.id)
.andThen {
case Success(_) =>
dataManagementService ! FinishWork(request.key)
}
.recover {
// if there is no lease, reissue it and retry immediately
case t: StatusRuntimeException =>
logging.warn(this, s"a lease is expired while registering data ${request.key}, reissue it: $t")
lease = None
leaseService ! GetLease
sendMessageToSelfAfter(request, retryInterval)

// it should retry forever until the data is stored
case t: Throwable =>
logging.warn(this, s"unexpected error happened: $t, retry storing data ${request.key}")
sendMessageToSelfAfter(request, retryInterval)
}
case None =>
logging.warn(this, s"lease not found, retry storing data ${request.key}")
leaseService ! GetLease
sendMessageToSelfAfter(request, retryInterval)
}

// it stores the data iif there is no such one
case request: RegisterInitialData =>
lease match {
case Some(l) =>
etcdClient
.putTxn(request.key, request.value, 0, l.id)
.map { res =>
dataManagementService ! FinishWork(request.key)
if (res.getSucceeded) {
logging.info(this, s"initial data storing succeeds for ${request.key}")
request.recipient.map(_ ! InitialDataStorageResults(request.key, Right(Done())))
} else {
logging.info(this, s"data is already stored for: $request, cancel the initial data storing")
request.recipient.map(_ ! InitialDataStorageResults(request.key, Left(AlreadyExist())))
}
}
.recover {
// if there is no lease, reissue it and retry immediately
case t: StatusRuntimeException =>
logging.warn(
this,
s"a lease is expired while registering an initial data ${request.key}, reissue it: $t")
lease = None
leaseService ! GetLease
sendMessageToSelfAfter(request, retryInterval)

// it should retry forever until the data is stored
case t: Throwable =>
logging.warn(this, s"unexpected error happened: $t, retry storing data for ${request.key}")
sendMessageToSelfAfter(request, retryInterval)
}
case None =>
logging.warn(this, s"lease not found, retry storing data for ${request.key}")
leaseService ! GetLease
sendMessageToSelfAfter(request, retryInterval)
}

case msg: WatcherClosed =>
etcdClient
.del(msg.key)
.andThen {
case Success(_) =>
dataManagementService ! FinishWork(msg.key)
}
.recover {
// if there is no lease, reissue it and retry immediately
case t: StatusRuntimeException =>
logging.warn(this, s"a lease is expired while deleting data ${msg.key}, reissue it: $t")
lease = None
leaseService ! GetLease
sendMessageToSelfAfter(msg, retryInterval)

// it should retry forever until the data is stored
case t: Throwable =>
logging.warn(this, s"unexpected error happened: $t, retry storing data for ${msg.key}")
sendMessageToSelfAfter(msg, retryInterval)
}

}

private def sendMessageToSelfAfter(msg: Any, retryInterval: FiniteDuration) = {
actorSystem.scheduler.scheduleOnce(retryInterval, self, msg)
}
}

object EtcdWorker {
def props(etcdClient: EtcdClient, leaseService: ActorRef)(implicit ec: ExecutionContext,
actorSystem: ActorSystem,
logging: Logging): Props = {
Props(new EtcdWorker(etcdClient, leaseService))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
import org.apache.openwhisk.core.etcd.EtcdKV.QueueKeys.queue
import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, SchedulerKeys}
import org.apache.openwhisk.core.etcd.EtcdType._
import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig, EtcdWorker}
import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulerStates}
import org.apache.openwhisk.core.service.{DataManagementService, EtcdWorker, LeaseKeepAliveService, WatcherService}
import org.apache.openwhisk.core.service.{DataManagementService, LeaseKeepAliveService, WatcherService}
import org.apache.openwhisk.core.{ConfigKeys, WarmUp, WhiskConfig}
import org.apache.openwhisk.grpc.{ActivationServiceClient, FetchRequest}
import org.apache.openwhisk.spi.SpiLoader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentEx
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.etcd.EtcdKV.{QueueKeys, SchedulerKeys}
import org.apache.openwhisk.core.etcd.EtcdType.ByteStringToString
import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig, EtcdWorker}
import org.apache.openwhisk.core.scheduler.container.{ContainerManager, CreationJobManager}
import org.apache.openwhisk.core.scheduler.grpc.ActivationServiceImpl
import org.apache.openwhisk.core.scheduler.queue._
import org.apache.openwhisk.core.service.{DataManagementService, EtcdWorker, LeaseKeepAliveService, WatcherService}
import org.apache.openwhisk.core.service.{DataManagementService, LeaseKeepAliveService, WatcherService}
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.grpc.ActivationServiceHandler
import org.apache.openwhisk.http.BasicHttpService
Expand Down
Loading