Skip to content

Commit 5abf9d6

Browse files
committed
- removed usage implicit custom ActorMaterializer
- upgraded to Akka Http 10.2.3 - changed the way how Https set up when host name verification is disabled - changed RestartSink.withBackoff to accept RestartSettings - changed akka.actor.FSM.setTimer => akka.actor.FSM.startTimerAtFixedRate - changed akka.actor.Scheduler.schedule => akka.actor.Scheduler.scheduleAtFixedRate - updated to the new signature of Source.actorRef - replaced deprecated HttpResponse.copy - replaced Gzip with Coders.Gzip - fixed the scalafmt in tests - removed the implicit Materializer shutdown considering its lifecycle is managed by akka - increased the ansible wait time - switched to classic networking - disabled JFR
1 parent a201e02 commit 5abf9d6

File tree

108 files changed

+352
-559
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

108 files changed

+352
-559
lines changed

ansible/roles/controller/tasks/deploy.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,4 +361,4 @@
361361
register: result
362362
until: result.status == 200
363363
retries: 12
364-
delay: 5
364+
delay: 10

ansible/roles/controller/tasks/join_akka_cluster.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
env: >-
3333
{{ env | combine({
3434
'CONFIG_akka_cluster_seedNodes_' ~ seedNode.0:
35-
'akka.tcp://controller-actor-system@'~seedNode.1~':'~(controller.akka.cluster.basePort+seedNode.0)
35+
'akka://controller-actor-system@'~seedNode.1~':'~(controller.akka.cluster.basePort+seedNode.0)
3636
}) }}
3737
with_indexed_items: "{{ controller.akka.cluster.seedNodes }}"
3838
loop_control:
@@ -42,11 +42,11 @@
4242
vars:
4343
akka_env:
4444
"CONFIG_akka_actor_provider": "{{ controller.akka.provider }}"
45-
"CONFIG_akka_remote_netty_tcp_hostname":
45+
"CONFIG_akka_remote_artery_canonical_hostname":
4646
"{{ controller.akka.cluster.host[(controller_index | int)] }}"
47-
"CONFIG_akka_remote_netty_tcp_port":
47+
"CONFIG_akka_remote_artery_canonical_port":
4848
"{{ controller.akka.cluster.basePort + (controller_index | int) }}"
49-
"CONFIG_akka_remote_netty_tcp_bindPort":
49+
"CONFIG_akka_remote_artery_bind_port":
5050
"{{ controller.akka.cluster.bindPort }}"
5151
set_fact:
5252
env: "{{ env | combine(akka_env) }}"

common/scala/src/main/resources/application.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818
# default application configuration file for akka
1919
include "logging"
2020

21+
akka {
22+
java-flight-recorder.enabled = false
23+
}
24+
2125
akka.http {
2226
client {
2327
parsing.illegal-header-warnings = off

common/scala/src/main/scala/org/apache/openwhisk/common/Https.scala

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,8 @@ package org.apache.openwhisk.common
2020
import java.io.{FileInputStream, InputStream}
2121
import java.security.{KeyStore, SecureRandom}
2222
import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}
23-
24-
import akka.http.scaladsl.ConnectionContext
23+
import akka.http.scaladsl.{ConnectionContext, HttpsConnectionContext}
2524
import akka.stream.TLSClientAuth
26-
import com.typesafe.sslconfig.akka.AkkaSSLConfig
2725

2826
object Https {
2927
case class HttpsConfig(keystorePassword: String, keystoreFlavor: String, keystorePath: String, clientAuth: String)
@@ -35,8 +33,16 @@ object Https {
3533
cs
3634
}
3735

38-
def connectionContext(httpsConfig: HttpsConfig, sslConfig: Option[AkkaSSLConfig] = None) = {
36+
def httpsInsecureClient(context: SSLContext): HttpsConnectionContext =
37+
ConnectionContext.httpsClient((host, port) => {
38+
val engine = context.createSSLEngine(host, port)
39+
engine.setUseClientMode(true)
40+
// WARNING: this creates an SSL Engine without enabling endpoint identification/verification procedures
41+
// Disabling host name verification is a very bad idea, please don't unless you have a very good reason to.
42+
engine
43+
})
3944

45+
def applyHttpsConfig(httpsConfig: HttpsConfig, withDisableHostnameVerification: Boolean = false): SSLContext = {
4046
val keyFactoryType = "SunX509"
4147
val clientAuth = {
4248
if (httpsConfig.clientAuth.toBoolean)
@@ -63,7 +69,27 @@ object Https {
6369

6470
val sslContext: SSLContext = SSLContext.getInstance("TLS")
6571
sslContext.init(keyManagerFactory.getKeyManagers, trustManagerFactory.getTrustManagers, new SecureRandom)
72+
sslContext
73+
}
74+
75+
def connectionContextClient(httpsConfig: HttpsConfig,
76+
withDisableHostnameVerification: Boolean = false): HttpsConnectionContext = {
77+
val sslContext = applyHttpsConfig(httpsConfig, withDisableHostnameVerification)
78+
connectionContextClient(sslContext, withDisableHostnameVerification)
79+
}
80+
81+
def connectionContextClient(sslContext: SSLContext,
82+
withDisableHostnameVerification: Boolean): HttpsConnectionContext = {
83+
if (withDisableHostnameVerification) {
84+
httpsInsecureClient(sslContext)
85+
} else {
86+
ConnectionContext.httpsClient(sslContext)
87+
}
88+
}
6689

67-
ConnectionContext.https(sslContext, sslConfig, clientAuth = clientAuth)
90+
def connectionContextServer(httpsConfig: HttpsConfig,
91+
withDisableHostnameVerification: Boolean = false): HttpsConnectionContext = {
92+
val sslContext: SSLContext = applyHttpsConfig(httpsConfig, withDisableHostnameVerification)
93+
ConnectionContext.httpsServer(sslContext)
6894
}
6995
}

common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationFileLogStore.scala

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import java.time.Instant
3232
import akka.NotUsed
3333
import akka.actor.ActorSystem
3434
import akka.stream.alpakka.file.scaladsl.LogRotatorSink
35-
import akka.stream.{Graph, SinkShape, UniformFanOutShape}
35+
import akka.stream.{Graph, RestartSettings, SinkShape, UniformFanOutShape}
3636
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, MergeHub, RestartSink, Sink, Source}
3737
import akka.util.ByteString
3838

@@ -87,32 +87,33 @@ class DockerToActivationFileLogStore(system: ActorSystem, destinationDirectory:
8787
protected val writeToFile: Sink[ByteString, _] = MergeHub
8888
.source[ByteString]
8989
.batchWeighted(bufferSize.toBytes, _.length, identity)(_ ++ _)
90-
.to(RestartSink.withBackoff(minBackoff = 1.seconds, maxBackoff = 60.seconds, randomFactor = 0.2) { () =>
91-
LogRotatorSink(() => {
92-
val maxSize = bufferSize.toBytes
93-
var bytesRead = maxSize
94-
element =>
95-
{
96-
val size = element.size
97-
if (bytesRead + size > maxSize) {
98-
bytesRead = size
99-
val logFilePath = destinationDirectory.resolve(s"userlogs-${Instant.now.toEpochMilli}.log")
100-
logging.info(this, s"Rotating log file to '$logFilePath'")
101-
try {
102-
Files.createFile(logFilePath)
103-
Files.setPosixFilePermissions(logFilePath, perms)
104-
} catch {
105-
case t: Throwable =>
106-
logging.error(this, s"Couldn't create userlogs file: $t")
107-
throw t
90+
.to(RestartSink.withBackoff(RestartSettings(minBackoff = 1.seconds, maxBackoff = 60.seconds, randomFactor = 0.2)) {
91+
() =>
92+
LogRotatorSink(() => {
93+
val maxSize = bufferSize.toBytes
94+
var bytesRead = maxSize
95+
element =>
96+
{
97+
val size = element.size
98+
if (bytesRead + size > maxSize) {
99+
bytesRead = size
100+
val logFilePath = destinationDirectory.resolve(s"userlogs-${Instant.now.toEpochMilli}.log")
101+
logging.info(this, s"Rotating log file to '$logFilePath'")
102+
try {
103+
Files.createFile(logFilePath)
104+
Files.setPosixFilePermissions(logFilePath, perms)
105+
} catch {
106+
case t: Throwable =>
107+
logging.error(this, s"Couldn't create userlogs file: $t")
108+
throw t
109+
}
110+
Some(logFilePath)
111+
} else {
112+
bytesRead += size
113+
None
108114
}
109-
Some(logFilePath)
110-
} else {
111-
bytesRead += size
112-
None
113115
}
114-
}
115-
})
116+
})
116117
})
117118
.run()
118119

common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationLogStore.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import java.time.Instant
2121

2222
import akka.NotUsed
2323
import akka.actor.ActorSystem
24-
import akka.stream.ActorMaterializer
2524
import akka.stream.scaladsl.Sink
2625
import akka.stream.scaladsl.Flow
2726
import akka.stream.scaladsl.Source
@@ -62,7 +61,7 @@ object DockerToActivationLogStore {
6261
*/
6362
class DockerToActivationLogStore(system: ActorSystem) extends LogStore {
6463
implicit val ec: ExecutionContext = system.dispatcher
65-
implicit val mat: ActorMaterializer = ActorMaterializer()(system)
64+
implicit val actorSystem: ActorSystem = system
6665

6766
/* "json-file" is the log-driver that writes out to file */
6867
override val containerParameters = Map("--log-driver" -> Set("json-file"))

common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStore.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,6 @@ class ElasticSearchLogStore(
7575
elasticSearchConfig.logSchema.time)
7676
}
7777

78-
implicit val actorSystem = system
79-
8078
private val esClient = new ElasticSearchRestClient(
8179
elasticSearchConfig.protocol,
8280
elasticSearchConfig.host,

common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/SplunkLogStore.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.time.Instant
2121
import java.time.temporal.ChronoUnit
2222

2323
import akka.actor.ActorSystem
24+
import akka.http.scaladsl.ConnectionContext
2425
import akka.http.scaladsl.Http
2526
import akka.http.scaladsl.client.RequestBuilding.Post
2627
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
@@ -32,14 +33,13 @@ import akka.http.scaladsl.model.Uri.Path
3233
import akka.http.scaladsl.model.headers.Authorization
3334
import akka.http.scaladsl.model.headers.BasicHttpCredentials
3435
import akka.http.scaladsl.unmarshalling.Unmarshal
35-
import akka.stream.ActorMaterializer
3636
import akka.stream.OverflowStrategy
3737
import akka.stream.QueueOfferResult
3838
import akka.stream.scaladsl.Flow
3939
import akka.stream.scaladsl.Keep
4040
import akka.stream.scaladsl.Sink
4141
import akka.stream.scaladsl.Source
42-
import com.typesafe.sslconfig.akka.AkkaSSLConfig
42+
import javax.net.ssl.{SSLContext, SSLEngine}
4343
import pureconfig._
4444
import pureconfig.generic.auto._
4545

@@ -89,7 +89,6 @@ class SplunkLogStore(
8989
extends LogDriverLogStore(actorSystem) {
9090
implicit val as = actorSystem
9191
implicit val ec = as.dispatcher
92-
implicit val materializer = ActorMaterializer()
9392
private val logging = new AkkaLogging(actorSystem.log)
9493

9594
private val splunkApi = Path / "services" / "search" / "jobs" //see http://docs.splunk.com/Documentation/Splunk/6.6.3/RESTREF/RESTsearch#search.2Fjobs
@@ -98,12 +97,22 @@ class SplunkLogStore(
9897

9998
val maxPendingRequests = 500
10099

100+
def createInsecureSslEngine(host: String, port: Int): SSLEngine = {
101+
val engine = SSLContext.getDefault.createSSLEngine(host, port)
102+
engine.setUseClientMode(true)
103+
104+
// WARNING: this creates an SSL Engine without enabling endpoint identification/verification procedures
105+
// Disabling host name verification is a very bad idea, please don't unless you have a very good reason to.
106+
107+
engine
108+
}
109+
101110
val defaultHttpFlow = Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](
102111
host = splunkConfig.host,
103112
port = splunkConfig.port,
104113
connectionContext =
105114
if (splunkConfig.disableSNI)
106-
Http().createClientHttpsContext(AkkaSSLConfig().mapSettings(s => s.withLoose(s.loose.withDisableSNI(true))))
115+
ConnectionContext.httpsClient(createInsecureSslEngine _)
107116
else Http().defaultClientHttpsContext)
108117

109118
override def fetchLogs(namespace: String,

common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationFileStorage.scala

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import java.nio.file.{Files, Path}
2222
import java.time.Instant
2323
import java.util.EnumSet
2424

25-
import akka.stream.ActorMaterializer
25+
import akka.actor.ActorSystem
26+
import akka.stream.RestartSettings
2627
import akka.stream.alpakka.file.scaladsl.LogRotatorSink
2728
import akka.stream.scaladsl.{Flow, MergeHub, RestartSink, Sink, Source}
2829
import akka.util.ByteString
@@ -37,38 +38,38 @@ import scala.concurrent.duration._
3738
class ActivationFileStorage(logFilePrefix: String,
3839
logPath: Path,
3940
writeResultToFile: Boolean,
40-
actorMaterializer: ActorMaterializer,
41+
actorSystem: ActorSystem,
4142
logging: Logging) {
42-
43-
implicit val materializer = actorMaterializer
43+
implicit val system: ActorSystem = actorSystem
4444

4545
private var logFile = logPath
4646
private val bufferSize = 100.MB
4747
private val perms = EnumSet.of(OWNER_READ, OWNER_WRITE, GROUP_READ, GROUP_WRITE, OTHERS_READ, OTHERS_WRITE)
4848
private val writeToFile: Sink[ByteString, _] = MergeHub
4949
.source[ByteString]
5050
.batchWeighted(bufferSize.toBytes, _.length, identity)(_ ++ _)
51-
.to(RestartSink.withBackoff(minBackoff = 1.seconds, maxBackoff = 60.seconds, randomFactor = 0.2) { () =>
52-
LogRotatorSink(() => {
53-
val maxSize = bufferSize.toBytes
54-
var bytesRead = maxSize
55-
element =>
56-
{
57-
val size = element.size
58-
59-
if (bytesRead + size > maxSize) {
60-
logFile = logPath.resolve(s"$logFilePrefix-${Instant.now.toEpochMilli}.log")
61-
62-
logging.info(this, s"Rotating log file to '$logFile'")
63-
createLogFile(logFile)
64-
bytesRead = size
65-
Some(logFile)
66-
} else {
67-
bytesRead += size
68-
None
51+
.to(RestartSink.withBackoff(RestartSettings(minBackoff = 1.seconds, maxBackoff = 60.seconds, randomFactor = 0.2)) {
52+
() =>
53+
LogRotatorSink(() => {
54+
val maxSize = bufferSize.toBytes
55+
var bytesRead = maxSize
56+
element =>
57+
{
58+
val size = element.size
59+
60+
if (bytesRead + size > maxSize) {
61+
logFile = logPath.resolve(s"$logFilePrefix-${Instant.now.toEpochMilli}.log")
62+
63+
logging.info(this, s"Rotating log file to '$logFile'")
64+
createLogFile(logFile)
65+
bytesRead = size
66+
Some(logFile)
67+
} else {
68+
bytesRead += size
69+
None
70+
}
6971
}
70-
}
71-
})
72+
})
7273
})
7374
.run()
7475

common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStore.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package org.apache.openwhisk.core.database
2020
import java.time.Instant
2121

2222
import akka.actor.ActorSystem
23-
import akka.stream.ActorMaterializer
2423
import akka.http.scaladsl.model.HttpRequest
2524
import spray.json.JsObject
2625
import org.apache.openwhisk.common.{Logging, TransactionId}
@@ -199,5 +198,5 @@ trait ActivationStore {
199198
}
200199

201200
trait ActivationStoreProvider extends Spi {
202-
def instance(actorSystem: ActorSystem, actorMaterializer: ActorMaterializer, logging: Logging): ActivationStore
201+
def instance(actorSystem: ActorSystem, logging: Logging): ActivationStore
203202
}

0 commit comments

Comments
 (0)