Skip to content

Upgrade to Akka 2.6.12 #5065

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ansible/roles/controller/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -365,4 +365,4 @@
register: result
until: result.status == 200
retries: 12
delay: 5
delay: 10
8 changes: 4 additions & 4 deletions ansible/roles/controller/tasks/join_akka_cluster.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
env: >-
{{ env | combine({
'CONFIG_akka_cluster_seedNodes_' ~ seedNode.0:
'akka.tcp://controller-actor-system@'~seedNode.1~':'~(controller.akka.cluster.basePort+seedNode.0)
'akka://controller-actor-system@'~seedNode.1~':'~(controller.akka.cluster.basePort+seedNode.0)
}) }}
with_indexed_items: "{{ controller.akka.cluster.seedNodes }}"
loop_control:
Expand All @@ -42,11 +42,11 @@
vars:
akka_env:
"CONFIG_akka_actor_provider": "{{ controller.akka.provider }}"
"CONFIG_akka_remote_netty_tcp_hostname":
"CONFIG_akka_remote_artery_canonical_hostname":
"{{ controller.akka.cluster.host[(controller_index | int)] }}"
"CONFIG_akka_remote_netty_tcp_port":
"CONFIG_akka_remote_artery_canonical_port":
"{{ controller.akka.cluster.basePort + (controller_index | int) }}"
"CONFIG_akka_remote_netty_tcp_bindPort":
"CONFIG_akka_remote_artery_bind_port":
"{{ controller.akka.cluster.bindPort }}"
set_fact:
env: "{{ env | combine(akka_env) }}"
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ subprojects {
'akka-discovery', 'akka-distributed-data', 'akka-protobuf', 'akka-remote', 'akka-slf4j',
'akka-stream', 'akka-stream-testkit', 'akka-testkit']
def akkaHttp = ['akka-http', 'akka-http-core', 'akka-http-spray-json', 'akka-http-testkit', 'akka-http-xml',
'akka-parsing']
'akka-parsing', 'akka-http2-support']

akka.forEach {
cons.add('compile', "com.typesafe.akka:${it}_${gradle.scala.depVersion}:${gradle.akka.version}")
Expand Down
4 changes: 4 additions & 0 deletions common/scala/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
# default application configuration file for akka
include "logging"

akka {
java-flight-recorder.enabled = false
}

akka.http {
client {
parsing.illegal-header-warnings = off
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@ package org.apache.openwhisk.common
import java.io.{FileInputStream, InputStream}
import java.security.{KeyStore, SecureRandom}
import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}

import akka.http.scaladsl.ConnectionContext
import akka.http.scaladsl.{ConnectionContext, HttpsConnectionContext}
import akka.stream.TLSClientAuth
import com.typesafe.sslconfig.akka.AkkaSSLConfig

object Https {
case class HttpsConfig(keystorePassword: String, keystoreFlavor: String, keystorePath: String, clientAuth: String)
Expand All @@ -35,8 +33,16 @@ object Https {
cs
}

def connectionContext(httpsConfig: HttpsConfig, sslConfig: Option[AkkaSSLConfig] = None) = {
def httpsInsecureClient(context: SSLContext): HttpsConnectionContext =
ConnectionContext.httpsClient((host, port) => {
val engine = context.createSSLEngine(host, port)
engine.setUseClientMode(true)
// WARNING: this creates an SSL Engine without enabling endpoint identification/verification procedures
// Disabling host name verification is a very bad idea, please don't unless you have a very good reason to.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment why it is ok here, given the warning in the comment? ie when is withDisableHostnameVerification set and ok?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment and the implementation is taken from the akka 2.6.x documentation on how to disable host name verification for ssl. This is what openwhisk was doing in couple of places including tests, but having the old SSLConfig class deprecated it had to be re-implemented. So I figured it, I'd rather keep comment to discourage using this approach anywhere in production.

Do you suggest adding more explanation on why is it needed to the code? Sure, I can do that.

Copy link
Member

@rabbah rabbah Mar 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah - thanks for the added clarification. I think this was/is needed for testing with self-signed certs.

engine
})

def applyHttpsConfig(httpsConfig: HttpsConfig, withDisableHostnameVerification: Boolean = false): SSLContext = {
val keyFactoryType = "SunX509"
val clientAuth = {
if (httpsConfig.clientAuth.toBoolean)
Expand All @@ -63,7 +69,27 @@ object Https {

val sslContext: SSLContext = SSLContext.getInstance("TLS")
sslContext.init(keyManagerFactory.getKeyManagers, trustManagerFactory.getTrustManagers, new SecureRandom)
sslContext
}

def connectionContextClient(httpsConfig: HttpsConfig,
withDisableHostnameVerification: Boolean = false): HttpsConnectionContext = {
val sslContext = applyHttpsConfig(httpsConfig, withDisableHostnameVerification)
connectionContextClient(sslContext, withDisableHostnameVerification)
}

def connectionContextClient(sslContext: SSLContext,
withDisableHostnameVerification: Boolean): HttpsConnectionContext = {
if (withDisableHostnameVerification) {
httpsInsecureClient(sslContext)
} else {
ConnectionContext.httpsClient(sslContext)
}
}

ConnectionContext.https(sslContext, sslConfig, clientAuth = clientAuth)
def connectionContextServer(httpsConfig: HttpsConfig,
withDisableHostnameVerification: Boolean = false): HttpsConnectionContext = {
val sslContext: SSLContext = applyHttpsConfig(httpsConfig, withDisableHostnameVerification)
ConnectionContext.httpsServer(sslContext)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import java.time.Instant
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.alpakka.file.scaladsl.LogRotatorSink
import akka.stream.{Graph, SinkShape, UniformFanOutShape}
import akka.stream.{Graph, RestartSettings, SinkShape, UniformFanOutShape}
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, MergeHub, RestartSink, Sink, Source}
import akka.util.ByteString

Expand Down Expand Up @@ -87,32 +87,33 @@ class DockerToActivationFileLogStore(system: ActorSystem, destinationDirectory:
protected val writeToFile: Sink[ByteString, _] = MergeHub
.source[ByteString]
.batchWeighted(bufferSize.toBytes, _.length, identity)(_ ++ _)
.to(RestartSink.withBackoff(minBackoff = 1.seconds, maxBackoff = 60.seconds, randomFactor = 0.2) { () =>
LogRotatorSink(() => {
val maxSize = bufferSize.toBytes
var bytesRead = maxSize
element =>
{
val size = element.size
if (bytesRead + size > maxSize) {
bytesRead = size
val logFilePath = destinationDirectory.resolve(s"userlogs-${Instant.now.toEpochMilli}.log")
logging.info(this, s"Rotating log file to '$logFilePath'")
try {
Files.createFile(logFilePath)
Files.setPosixFilePermissions(logFilePath, perms)
} catch {
case t: Throwable =>
logging.error(this, s"Couldn't create userlogs file: $t")
throw t
.to(RestartSink.withBackoff(RestartSettings(minBackoff = 1.seconds, maxBackoff = 60.seconds, randomFactor = 0.2)) {
() =>
LogRotatorSink(() => {
val maxSize = bufferSize.toBytes
var bytesRead = maxSize
element =>
{
val size = element.size
if (bytesRead + size > maxSize) {
bytesRead = size
val logFilePath = destinationDirectory.resolve(s"userlogs-${Instant.now.toEpochMilli}.log")
logging.info(this, s"Rotating log file to '$logFilePath'")
try {
Files.createFile(logFilePath)
Files.setPosixFilePermissions(logFilePath, perms)
} catch {
case t: Throwable =>
logging.error(this, s"Couldn't create userlogs file: $t")
throw t
}
Some(logFilePath)
} else {
bytesRead += size
None
}
Some(logFilePath)
} else {
bytesRead += size
None
}
}
})
})
})
.run()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.time.Instant

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Source
Expand Down Expand Up @@ -62,7 +61,7 @@ object DockerToActivationLogStore {
*/
class DockerToActivationLogStore(system: ActorSystem) extends LogStore {
implicit val ec: ExecutionContext = system.dispatcher
implicit val mat: ActorMaterializer = ActorMaterializer()(system)
implicit val actorSystem: ActorSystem = system

/* "json-file" is the log-driver that writes out to file */
override val containerParameters = Map("--log-driver" -> Set("json-file"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ class ElasticSearchLogStore(
elasticSearchConfig.logSchema.time)
}

implicit val actorSystem = system

private val esClient = new ElasticSearchRestClient(
elasticSearchConfig.protocol,
elasticSearchConfig.host,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.time.Instant
import java.time.temporal.ChronoUnit

import akka.actor.ActorSystem
import akka.http.scaladsl.ConnectionContext
import akka.http.scaladsl.Http
import akka.http.scaladsl.client.RequestBuilding.Post
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
Expand All @@ -32,14 +33,13 @@ import akka.http.scaladsl.model.Uri.Path
import akka.http.scaladsl.model.headers.Authorization
import akka.http.scaladsl.model.headers.BasicHttpCredentials
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import akka.stream.OverflowStrategy
import akka.stream.QueueOfferResult
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import com.typesafe.sslconfig.akka.AkkaSSLConfig
import javax.net.ssl.{SSLContext, SSLEngine}
import pureconfig._
import pureconfig.generic.auto._

Expand Down Expand Up @@ -89,7 +89,6 @@ class SplunkLogStore(
extends LogDriverLogStore(actorSystem) {
implicit val as = actorSystem
implicit val ec = as.dispatcher
implicit val materializer = ActorMaterializer()
private val logging = new AkkaLogging(actorSystem.log)

private val splunkApi = Path / "services" / "search" / "jobs" //see http://docs.splunk.com/Documentation/Splunk/6.6.3/RESTREF/RESTsearch#search.2Fjobs
Expand All @@ -98,12 +97,22 @@ class SplunkLogStore(

val maxPendingRequests = 500

def createInsecureSslEngine(host: String, port: Int): SSLEngine = {
val engine = SSLContext.getDefault.createSSLEngine(host, port)
engine.setUseClientMode(true)

// WARNING: this creates an SSL Engine without enabling endpoint identification/verification procedures
// Disabling host name verification is a very bad idea, please don't unless you have a very good reason to.

engine
}

val defaultHttpFlow = Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](
host = splunkConfig.host,
port = splunkConfig.port,
connectionContext =
if (splunkConfig.disableSNI)
Http().createClientHttpsContext(AkkaSSLConfig().mapSettings(s => s.withLoose(s.loose.withDisableSNI(true))))
ConnectionContext.httpsClient(createInsecureSslEngine _)
else Http().defaultClientHttpsContext)

override def fetchLogs(namespace: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import java.nio.file.{Files, Path}
import java.time.Instant
import java.util.EnumSet

import akka.stream.ActorMaterializer
import akka.actor.ActorSystem
import akka.stream.RestartSettings
import akka.stream.alpakka.file.scaladsl.LogRotatorSink
import akka.stream.scaladsl.{Flow, MergeHub, RestartSink, Sink, Source}
import akka.util.ByteString
Expand All @@ -37,38 +38,38 @@ import scala.concurrent.duration._
class ActivationFileStorage(logFilePrefix: String,
logPath: Path,
writeResultToFile: Boolean,
actorMaterializer: ActorMaterializer,
actorSystem: ActorSystem,
logging: Logging) {

implicit val materializer = actorMaterializer
implicit val system: ActorSystem = actorSystem

private var logFile = logPath
private val bufferSize = 100.MB
private val perms = EnumSet.of(OWNER_READ, OWNER_WRITE, GROUP_READ, GROUP_WRITE, OTHERS_READ, OTHERS_WRITE)
private val writeToFile: Sink[ByteString, _] = MergeHub
.source[ByteString]
.batchWeighted(bufferSize.toBytes, _.length, identity)(_ ++ _)
.to(RestartSink.withBackoff(minBackoff = 1.seconds, maxBackoff = 60.seconds, randomFactor = 0.2) { () =>
LogRotatorSink(() => {
val maxSize = bufferSize.toBytes
var bytesRead = maxSize
element =>
{
val size = element.size

if (bytesRead + size > maxSize) {
logFile = logPath.resolve(s"$logFilePrefix-${Instant.now.toEpochMilli}.log")

logging.info(this, s"Rotating log file to '$logFile'")
createLogFile(logFile)
bytesRead = size
Some(logFile)
} else {
bytesRead += size
None
.to(RestartSink.withBackoff(RestartSettings(minBackoff = 1.seconds, maxBackoff = 60.seconds, randomFactor = 0.2)) {
() =>
LogRotatorSink(() => {
val maxSize = bufferSize.toBytes
var bytesRead = maxSize
element =>
{
val size = element.size

if (bytesRead + size > maxSize) {
logFile = logPath.resolve(s"$logFilePrefix-${Instant.now.toEpochMilli}.log")

logging.info(this, s"Rotating log file to '$logFile'")
createLogFile(logFile)
bytesRead = size
Some(logFile)
} else {
bytesRead += size
None
}
}
}
})
})
})
.run()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.openwhisk.core.database
import java.time.Instant

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.http.scaladsl.model.HttpRequest
import spray.json.JsObject
import org.apache.openwhisk.common.{Logging, TransactionId}
Expand Down Expand Up @@ -199,5 +198,5 @@ trait ActivationStore {
}

trait ActivationStoreProvider extends Spi {
def instance(actorSystem: ActorSystem, actorMaterializer: ActorMaterializer, logging: Logging): ActivationStore
def instance(actorSystem: ActorSystem, logging: Logging): ActivationStore
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,19 @@ package org.apache.openwhisk.core.database
import java.time.Instant

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import spray.json.JsObject
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.entity._

import scala.concurrent.Future
import scala.util.{Failure, Success}

class ArtifactActivationStore(actorSystem: ActorSystem, actorMaterializer: ActorMaterializer, logging: Logging)
extends ActivationStore {
class ArtifactActivationStore(actorSystem: ActorSystem, logging: Logging) extends ActivationStore {

implicit val executionContext = actorSystem.dispatcher

private val artifactStore: ArtifactStore[WhiskActivation] =
WhiskActivationStore.datastore()(actorSystem, logging, actorMaterializer)
WhiskActivationStore.datastore()(actorSystem, logging)

def store(activation: WhiskActivation, context: UserContext)(
implicit transid: TransactionId,
Expand Down Expand Up @@ -131,6 +129,6 @@ class ArtifactActivationStore(actorSystem: ActorSystem, actorMaterializer: Actor
}

object ArtifactActivationStoreProvider extends ActivationStoreProvider {
override def instance(actorSystem: ActorSystem, actorMaterializer: ActorMaterializer, logging: Logging) =
new ArtifactActivationStore(actorSystem, actorMaterializer, logging)
override def instance(actorSystem: ActorSystem, logging: Logging) =
new ArtifactActivationStore(actorSystem, logging)
}
Loading