Skip to content

Commit a739454

Browse files
committed
Use max-connection-pool as queue size
1 parent a4f4b2d commit a739454

File tree

1 file changed

+13
-23
lines changed

1 file changed

+13
-23
lines changed

common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,37 +20,26 @@ package org.apache.openwhisk.core.containerpool
2020
import akka.actor.ActorSystem
2121
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
2222
import akka.http.scaladsl.marshalling.Marshal
23-
import akka.http.scaladsl.model.HttpMethods
24-
import akka.http.scaladsl.model.HttpRequest
25-
import akka.http.scaladsl.model.HttpResponse
26-
import akka.http.scaladsl.model.MediaTypes
27-
import akka.http.scaladsl.model.MessageEntity
28-
import akka.http.scaladsl.model.StatusCodes
23+
import akka.http.scaladsl.model._
2924
import akka.http.scaladsl.model.headers.Accept
3025
import akka.http.scaladsl.unmarshalling.Unmarshal
3126
import akka.stream.StreamTcpException
32-
import akka.stream.scaladsl.Sink
33-
import akka.stream.scaladsl.Source
27+
import akka.stream.scaladsl.{Sink, Source}
3428
import akka.util.ByteString
35-
import scala.concurrent.Await
36-
import scala.concurrent.ExecutionContext
37-
import scala.concurrent.Future
38-
import scala.concurrent.TimeoutException
39-
import scala.concurrent.duration._
40-
import scala.util.Try
41-
import scala.util.control.NonFatal
42-
import spray.json._
43-
import org.apache.openwhisk.common.Logging
4429
import org.apache.openwhisk.common.LoggingMarkers.CONTAINER_CLIENT_RETRIES
45-
import org.apache.openwhisk.common.MetricEmitter
46-
import org.apache.openwhisk.common.TransactionId
47-
import org.apache.openwhisk.core.entity.ActivationResponse.ContainerHttpError
48-
import org.apache.openwhisk.core.entity.ActivationResponse._
49-
import org.apache.openwhisk.core.entity.{ActivationEntityLimit, ByteSize}
30+
import org.apache.openwhisk.common.{Logging, MetricEmitter, TransactionId}
31+
import org.apache.openwhisk.core.entity.ActivationResponse.{ContainerHttpError, _}
5032
import org.apache.openwhisk.core.entity.size.SizeLong
33+
import org.apache.openwhisk.core.entity.{ActivationEntityLimit, ByteSize}
5134
import org.apache.openwhisk.http.PoolingRestClient
35+
import pureconfig.loadConfigOrThrow
36+
import spray.json._
5237

5338
import java.time.Instant
39+
import scala.concurrent.{Await, ExecutionContext, Future, TimeoutException}
40+
import scala.concurrent.duration._
41+
import scala.util.Try
42+
import scala.util.control.NonFatal
5443

5544
/**
5645
* This HTTP client is used only in the invoker to communicate with the action container.
@@ -193,6 +182,7 @@ protected class AkkaContainerClient(
193182
}
194183

195184
object AkkaContainerClient {
185+
private val queueSize = loadConfigOrThrow[Int]("akka.http.host-connection-pool.max-connections")
196186

197187
/** A helper method to post one single request to a connection. Used for container tests. */
198188
def post(host: String, port: Int, endPoint: String, content: JsValue, timeout: FiniteDuration)(
@@ -226,7 +216,7 @@ object AkkaContainerClient {
226216
tid: TransactionId,
227217
as: ActorSystem,
228218
ec: ExecutionContext): Seq[(Int, Option[JsObject])] = {
229-
val connection = new AkkaContainerClient(host, port, timeout, 1)
219+
val connection = new AkkaContainerClient(host, port, timeout, queueSize)
230220
val futureResults = contents.map { executeRequest(connection, endPoint, _) }
231221
val results = Await.result(Future.sequence(futureResults), timeout + 10.seconds) //additional timeout to complete futures
232222
connection.close()

0 commit comments

Comments
 (0)