Skip to content

Fix build error #5123

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import akka.actor.Status.{Failure => FailureMessage}
import akka.actor.{ActorRef, ActorSystem, FSM, Props, Stash}
import akka.grpc.internal.ClientClosedException
import akka.pattern.pipe
import akka.stream.ActorMaterializer
import io.grpc.StatusRuntimeException
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.connector.ActivationMessage
Expand Down Expand Up @@ -73,7 +72,6 @@ class ActivationClientProxy(
containerId: ContainerId,
activationClientFactory: (String, FullyQualifiedEntityName, String, Int, Boolean) => Future[ActivationServiceClient])(
implicit actorSystem: ActorSystem,
mat: ActorMaterializer,
logging: Logging)
extends FSM[ActivationClientProxyState, ActivationClientProxyData]
with Stash {
Expand Down Expand Up @@ -406,13 +404,12 @@ object ActivationClientProxy {
schedulerHost: String,
rpcPort: Int,
containerId: ContainerId,
activationClientFactory: (String,
FullyQualifiedEntityName,
String,
Int,
Boolean) => Future[ActivationServiceClient])(implicit actorSystem: ActorSystem,
mat: ActorMaterializer,
logging: Logging) = {
activationClientFactory: (
String,
FullyQualifiedEntityName,
String,
Int,
Boolean) => Future[ActivationServiceClient])(implicit actorSystem: ActorSystem, logging: Logging) = {
Props(
new ActivationClientProxy(
invocationNamespace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
import akka.event.Logging.InfoLevel
import akka.io.{IO, Tcp}
import akka.pattern.pipe
import akka.stream.ActorMaterializer
import org.apache.openwhisk.common.tracing.WhiskTracerProvider
import org.apache.openwhisk.common.{LoggingMarkers, TransactionId, _}
import org.apache.openwhisk.core.ConfigKeys
Expand Down Expand Up @@ -194,7 +193,7 @@ class FunctionPullingContainerProxy(
poolConfig: ContainerPoolConfig,
timeoutConfig: ContainerProxyTimeoutConfig,
healtCheckConfig: ContainerProxyHealthCheckConfig,
testTcp: Option[ActorRef])(implicit actorSystem: ActorSystem, mat: ActorMaterializer, logging: Logging)
testTcp: Option[ActorRef])(implicit actorSystem: ActorSystem, logging: Logging)
extends FSM[ProxyState, Data]
with Stash {
startWith(Uninitialized, NonexistentData())
Expand Down Expand Up @@ -373,7 +372,7 @@ class FunctionPullingContainerProxy(
case Event(initializedData: InitializedData, _) =>
context.parent ! Initialized(initializedData)
initializedData.clientProxy ! RequestActivation()
setTimer(UnusedTimeoutName, StateTimeout, unusedTimeout)
startSingleTimer(UnusedTimeoutName, StateTimeout, unusedTimeout)
stay() using initializedData

// 2. read executable action data from db
Expand Down Expand Up @@ -506,7 +505,7 @@ class FunctionPullingContainerProxy(
// 1. request activation message to client
case Event(activationResult: RunActivationCompleted, data: WarmData) =>
// create timeout
setTimer(UnusedTimeoutName, StateTimeout, unusedTimeout)
startSingleTimer(UnusedTimeoutName, StateTimeout, unusedTimeout)
data.clientProxy ! RequestActivation(activationResult.duration)
stay() using data

Expand Down Expand Up @@ -613,7 +612,7 @@ class FunctionPullingContainerProxy(
logging.info(
this,
s"Remain running activations ${runningActivations.keySet().toString()} when received ClientClosed")
setTimer(RunningActivationTimeoutName, ClientClosed, runningActivationTimeout)
startSingleTimer(RunningActivationTimeoutName, ClientClosed, runningActivationTimeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering if startTimerAtFixedRate​ should be used instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stay
}

Expand Down Expand Up @@ -674,7 +673,7 @@ class FunctionPullingContainerProxy(
// since akka port will no be used, we can put any value except 0 here
data.clientProxy ! RequestActivation(
newScheduler = Some(SchedulerEndpoints(job.schedulerHost, job.rpcPort, 10)))
setTimer(UnusedTimeoutName, StateTimeout, unusedTimeout)
startSingleTimer(UnusedTimeoutName, StateTimeout, unusedTimeout)
timedOut = false
}
.recover {
Expand Down Expand Up @@ -715,7 +714,7 @@ class FunctionPullingContainerProxy(
logging.info(
this,
s"This is the remaining container for ${data.action}. The container will stop after $warmedContainerKeepingTimeout.")
setTimer(KeepingTimeoutName, Remove, warmedContainerKeepingTimeout)
startSingleTimer(KeepingTimeoutName, Remove, warmedContainerKeepingTimeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startTimerAtFixedRate instead ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the source code

  def startTimerAtFixedRate(name: String, msg: Any, interval: FiniteDuration): Unit =
    startTimer(name, msg, interval, FixedRateMode)

  ...
  /** INTERNAL API */
  @InternalApi
  private[akka] case object FixedRateMode extends TimerMode {
    override def repeat: Boolean = true
  }

the method: startTimerAtFixedRate will invoke startTimer with repeat: true,
but for here, it is a one time timer,
So startSingleTimer is ok here.

stay

case Event(Remove | GracefulShutdown, data: WarmData) =>
Expand Down Expand Up @@ -777,7 +776,7 @@ class FunctionPullingContainerProxy(
}
}
unstashAll()
case _ -> Paused => setTimer(IdleTimeoutName, StateTimeout, idleTimeout)
case _ -> Paused => startSingleTimer(IdleTimeoutName, StateTimeout, idleTimeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startTimerAtFixedRate instead ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case _ -> Removing => unstashAll()
}

Expand Down Expand Up @@ -1245,7 +1244,7 @@ object FunctionPullingContainerProxy {
timeoutConfig: ContainerProxyTimeoutConfig,
healthCheckConfig: ContainerProxyHealthCheckConfig =
loadConfigOrThrow[ContainerProxyHealthCheckConfig](ConfigKeys.containerProxyHealth),
tcp: Option[ActorRef] = None)(implicit actorSystem: ActorSystem, mat: ActorMaterializer, logging: Logging) =
tcp: Option[ActorRef] = None)(implicit actorSystem: ActorSystem, logging: Logging) =
Props(
new FunctionPullingContainerProxy(
factory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import akka.Done
import akka.actor.FSM.{CurrentState, SubscribeTransitionCallBack, Transition}
import akka.actor.{ActorRef, ActorSystem}
import akka.grpc.internal.ClientClosedException
import akka.stream.ActorMaterializer
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
import common.StreamLogging
import io.grpc.StatusRuntimeException
Expand Down Expand Up @@ -59,7 +58,6 @@ class ActivationClientProxyTests

override def afterAll: Unit = TestKit.shutdownActorSystem(system)

implicit val mat = ActorMaterializer()
implicit val ec = system.dispatcher

val timeout = 20.seconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import akka.actor.FSM.{CurrentState, StateTimeout, SubscribeTransitionCallBack,
import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
import akka.http.scaladsl.model
import akka.io.Tcp.Connect
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
import akka.util.ByteString
Expand Down Expand Up @@ -86,7 +85,6 @@ class FunctionPullingContainerProxyTests
super.afterAll()
}

implicit val mat = ActorMaterializer()
implicit val ece: ExecutionContextExecutor = system.dispatcher

val timeout = 20.seconds
Expand Down