diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf index ff5f8e2af91..6c69e11b293 100644 --- a/common/scala/src/main/resources/application.conf +++ b/common/scala/src/main/resources/application.conf @@ -438,10 +438,19 @@ whisk { # uniqueName + displayName 253 (max pod name length in Kube) serdes-overhead = 6068 // 3034 bytes of metadata * 2 for extra headroom + # DEPRECATED, use store-blocking-result-level # Disables database store for blocking + successful activations # invocations made with `X-OW-EXTRA-LOGGING: on` header, will force the activation to be stored disable-store-result = false + # Result level to store in db for blocking activations (STORE_ALWAYS, STORE_FAILURES, STORE_FAILURES_NOT_APPLICATION_ERRORS) + # invocations made with `X-OW-EXTRA-LOGGING: on` header, will force the activation to be stored + store-blocking-result-level = "STORE_ALWAYS" + + # Result level to store in db for non-blocking activations (STORE_ALWAYS, STORE_FAILURES, STORE_FAILURES_NOT_APPLICATION_ERRORS) + # invocations made with `X-OW-EXTRA-LOGGING: on` header, will force the activation to be stored + store-non-blocking-result-level = "STORE_ALWAYS" + # Enable metadata logging of activations not stored in the database unstored-logs-enabled = false } diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala index e50fbb4c473..2a6490b6007 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala @@ -286,7 +286,10 @@ object ConfigKeys { val sharedPackageExecuteOnly = s"whisk.shared-packages-execute-only" val swaggerUi = "whisk.swagger-ui" + /* DEPRECATED: disableStoreResult is deprecated for storeBlockingResultLevel */ val disableStoreResult = s"$activation.disable-store-result" + val storeBlockingResultLevel = s"$activation.store-blocking-result-level" + val storeNonBlockingResultLevel = s"$activation.store-non-blocking-result-level" val unstoredLogsEnabled = s"$activation.unstored-logs-enabled" val apacheClientConfig = "whisk.apache-client" diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStore.scala index 83651507b86..9981bff16f0 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStore.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStore.scala @@ -18,7 +18,6 @@ package org.apache.openwhisk.core.database import java.time.Instant - import akka.actor.ActorSystem import akka.http.scaladsl.model.HttpRequest import spray.json.JsObject @@ -33,8 +32,24 @@ import scala.concurrent.Future case class UserContext(user: Identity, request: HttpRequest = HttpRequest()) trait ActivationStore { + val logging: Logging + /* DEPRECATED: disableStoreResult config is now deprecated replaced with blocking activation store level (storeBlockingResultLevel) */ protected val disableStoreResultConfig = loadConfigOrThrow[Boolean](ConfigKeys.disableStoreResult) + protected val storeBlockingResultLevelConfig = { + try { + ActivationStoreLevel.valueOf(loadConfigOrThrow[String](ConfigKeys.storeBlockingResultLevel)) + } catch { + case _: Exception => + val disableStoreResultConfig = loadConfigOrThrow[Boolean](ConfigKeys.disableStoreResult) + logging.warn( + this, + s"The config ${ConfigKeys.disableStoreResult} being used is deprecated. Please use the replacement config ${ConfigKeys.storeBlockingResultLevel}") + if (disableStoreResultConfig) ActivationStoreLevel.STORE_FAILURES else ActivationStoreLevel.STORE_ALWAYS + } + } + protected val storeNonBlockingResultLevelConfig = + ActivationStoreLevel.valueOf(loadConfigOrThrow[String](ConfigKeys.storeNonBlockingResultLevel)) protected val unstoredLogsEnabledConfig = loadConfigOrThrow[Boolean](ConfigKeys.unstoredLogsEnabled) /** @@ -42,6 +57,8 @@ trait ActivationStore { * * @param activation activation to store * @param isBlockingActivation is activation blocking + * @param blockingStoreLevel do not store activation if successful and blocking + * @param nonBlockingStoreLevel do not store activation if successful and non-blocking * @param context user and request context * @param transid transaction ID for request * @param notifier cache change notifier @@ -49,16 +66,18 @@ trait ActivationStore { */ def storeAfterCheck(activation: WhiskActivation, isBlockingActivation: Boolean, - disableStore: Option[Boolean], + blockingStoreLevel: Option[ActivationStoreLevel.Value], + nonBlockingStoreLevel: Option[ActivationStoreLevel.Value], context: UserContext)(implicit transid: TransactionId, notifier: Option[CacheChangeNotification], logging: Logging): Future[DocInfo] = { if (context.user.limits.storeActivations.getOrElse(true) && shouldStoreActivation( - activation.response.isSuccess, + activation.response, isBlockingActivation, transid.meta.extraLogging, - disableStore.getOrElse(disableStoreResultConfig))) { + blockingStoreLevel.getOrElse(storeBlockingResultLevelConfig), + nonBlockingStoreLevel.getOrElse(storeNonBlockingResultLevelConfig))) { store(activation, context) } else { @@ -183,17 +202,29 @@ trait ActivationStore { * - an activation in debug mode * - activation stores is not disabled via a configuration parameter * - * @param isSuccess is successful activation + * @param activationResponse to check * @param isBlocking is blocking activation * @param debugMode is logging header set to "on" for the invocation - * @param disableStore is disable store configured + * @param blockingStoreLevel level of activation status to store for blocking invocations + * @param nonBlockingStoreLevel level of activation status to store for blocking invocations * @return Should the activation be stored to the database */ - private def shouldStoreActivation(isSuccess: Boolean, + private def shouldStoreActivation(activationResponse: ActivationResponse, isBlocking: Boolean, debugMode: Boolean, - disableStore: Boolean): Boolean = { - !isSuccess || !isBlocking || debugMode || !disableStore + blockingStoreLevel: ActivationStoreLevel.Value, + nonBlockingStoreLevel: ActivationStoreLevel.Value): Boolean = { + def shouldStoreOnLevel(storageLevel: ActivationStoreLevel.Value): Boolean = { + storageLevel match { + case ActivationStoreLevel.STORE_ALWAYS => true + case ActivationStoreLevel.STORE_FAILURES => !activationResponse.isSuccess + case ActivationStoreLevel.STORE_FAILURES_NOT_APPLICATION_ERRORS => + activationResponse.isContainerError || activationResponse.isWhiskError + } + } + + debugMode || (isBlocking && shouldStoreOnLevel(blockingStoreLevel)) || (!isBlocking && shouldStoreOnLevel( + nonBlockingStoreLevel)) } } diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStoreLevel.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStoreLevel.scala new file mode 100644 index 00000000000..7f37906987a --- /dev/null +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStoreLevel.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.database + +object ActivationStoreLevel extends Enumeration { + type ActivationStoreLevel = Value + val STORE_ALWAYS, STORE_FAILURES, STORE_FAILURES_NOT_APPLICATION_ERRORS = Value + + def valueOf(value: String): Value = { + values + .find(_.toString == value.toUpperCase()) + .getOrElse(throw new IllegalArgumentException(s"Invalid log level: $value")) + } +} diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactActivationStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactActivationStore.scala index 1e2ab886da1..9b62f728824 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactActivationStore.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactActivationStore.scala @@ -27,7 +27,7 @@ import org.apache.openwhisk.core.entity._ import scala.concurrent.Future import scala.util.{Failure, Success} -class ArtifactActivationStore(actorSystem: ActorSystem, logging: Logging) extends ActivationStore { +class ArtifactActivationStore(actorSystem: ActorSystem, override val logging: Logging) extends ActivationStore { implicit val executionContext = actorSystem.dispatcher diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala index 39f5a0cae23..50504263f90 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala @@ -57,7 +57,7 @@ case class ElasticSearchActivationStoreConfig(protocol: String, class ElasticSearchActivationStore( httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None, elasticSearchConfig: ElasticSearchActivationStoreConfig, - useBatching: Boolean = false)(implicit actorSystem: ActorSystem, logging: Logging) + useBatching: Boolean = false)(implicit actorSystem: ActorSystem, override val logging: Logging) extends ActivationStore { import com.sksamuel.elastic4s.http.ElasticDsl._ diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/memory/NoopActivationStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/memory/NoopActivationStore.scala index 7b50685891a..8d45ff02fc0 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/memory/NoopActivationStore.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/memory/NoopActivationStore.scala @@ -18,21 +18,16 @@ package org.apache.openwhisk.core.database.memory import java.time.Instant - import akka.actor.ActorSystem -import org.apache.openwhisk.common.{Logging, TransactionId, WhiskInstants} -import org.apache.openwhisk.core.database.{ - ActivationStore, - ActivationStoreProvider, - CacheChangeNotification, - UserContext -} +import org.apache.openwhisk.common.{Logging, PrintStreamLogging, TransactionId, WhiskInstants} +import org.apache.openwhisk.core.database.{ActivationStore, ActivationStoreProvider, CacheChangeNotification, UserContext} import org.apache.openwhisk.core.entity.{ActivationId, DocInfo, EntityName, EntityPath, Subject, WhiskActivation} import spray.json.{JsNumber, JsObject} import scala.concurrent.Future object NoopActivationStore extends ActivationStore with WhiskInstants { + override val logging = new PrintStreamLogging() private val emptyInfo = DocInfo("foo") private val emptyCount = JsObject("activations" -> JsNumber(0)) private val dummyActivation = WhiskActivation( diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Triggers.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Triggers.scala index c70caee1705..57510993a56 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Triggers.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Triggers.scala @@ -161,7 +161,7 @@ trait WhiskTriggersApi extends WhiskCollectionAPI { triggerActivation } .map { activation => - activationStore.storeAfterCheck(activation, false, None, context) + activationStore.storeAfterCheck(activation, false, None, None, context) } respondWithActivationIdHeader(triggerActivationId) { diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala index 621a10e2f19..19ca12e9f03 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala @@ -600,7 +600,10 @@ protected[actions] trait PrimitiveActions { } } - activationStore.storeAfterCheck(activation, blockingComposition, None, context)(transid, notifier = None, logging) + activationStore.storeAfterCheck(activation, blockingComposition, None, None, context)( + transid, + notifier = None, + logging) activation } diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/SequenceActions.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/SequenceActions.scala index 619ccdbd83b..0bc082bf35b 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/SequenceActions.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/SequenceActions.scala @@ -181,7 +181,7 @@ protected[actions] trait SequenceActions { } } - activationStore.storeAfterCheck(seqActivation, blockingSequence, None, context)( + activationStore.storeAfterCheck(seqActivation, blockingSequence, None, None, context)( transid, notifier = None, logging) diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala index 37c9ffb4a04..2280ce76534 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala @@ -366,7 +366,8 @@ class FPCPoolBalancer(config: WhiskConfig, // and complete the promise with a failure if necessary activationPromises .remove(aid) - .foreach(_.tryFailure(new Throwable("Activation entry has timed out, no completion or active ack received yet"))) + .foreach( + _.tryFailure(new Throwable("Activation entry has timed out, no completion or active ack received yet"))) } // Active acks that are received here are strictly from user actions - health actions are not part of diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala index 8a0fdc4720f..8158fd3ab27 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala @@ -178,7 +178,7 @@ class FPCInvokerReactive(config: WhiskConfig, /** Stores an activation in the database. */ private val store = (tid: TransactionId, activation: WhiskActivation, isBlocking: Boolean, context: UserContext) => { implicit val transid: TransactionId = tid - activationStore.storeAfterCheck(activation, isBlocking, None, context)(tid, notifier = None, logging) + activationStore.storeAfterCheck(activation, isBlocking, None, None, context)(tid, notifier = None, logging) } private def healthActivationClientFactory(f: ActorRefFactory, diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala index f2d36b144aa..e32ece2fce8 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala @@ -143,7 +143,7 @@ class InvokerReactive( /** Stores an activation in the database. */ private val store = (tid: TransactionId, activation: WhiskActivation, isBlocking: Boolean, context: UserContext) => { implicit val transid: TransactionId = tid - activationStore.storeAfterCheck(activation, isBlocking, None, context)(tid, notifier = None, logging) + activationStore.storeAfterCheck(activation, isBlocking, None, None, context)(tid, notifier = None, logging) } /** Creates a ContainerProxy Actor when being called. */ diff --git a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiWithDbPollingTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiWithDbPollingTests.scala index 962b5da87d8..cc391a18631 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiWithDbPollingTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiWithDbPollingTests.scala @@ -18,14 +18,13 @@ package org.apache.openwhisk.core.controller.test import java.time.Instant - import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport.sprayJsonUnmarshaller import akka.http.scaladsl.model.StatusCodes._ import akka.http.scaladsl.model.headers.RawHeader import akka.http.scaladsl.server.Route import org.apache.openwhisk.core.controller.WhiskActionsApi import org.apache.openwhisk.core.controller.actions.ControllerActivationConfig -import org.apache.openwhisk.core.database.UserContext +import org.apache.openwhisk.core.database.{ActivationStoreLevel, UserContext} import org.apache.openwhisk.core.entity._ import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner @@ -79,7 +78,7 @@ class ActionsApiWithDbPollingTests extends ControllerTestCommon with WhiskAction // storing the activation in the db will allow the db polling to retrieve it // the test harness makes sure the activation id observed by the test matches // the one generated by the api handler - storeActivation(activation, false, false, context) + storeActivation(activation, false, ActivationStoreLevel.STORE_ALWAYS, ActivationStoreLevel.STORE_ALWAYS, context) try { Post(s"$collectionPath/${action.name}?blocking=true") ~> Route.seal(routes(creds)) ~> check { status should be(OK) @@ -114,7 +113,7 @@ class ActionsApiWithDbPollingTests extends ControllerTestCommon with WhiskAction // storing the activation in the db will allow the db polling to retrieve it // the test harness makes sure the activation id observed by the test matches // the one generated by the api handler - storeActivation(activation, false, false, context) + storeActivation(activation, false, ActivationStoreLevel.STORE_ALWAYS, ActivationStoreLevel.STORE_ALWAYS, context) try { Post(s"$collectionPath/${action.name}?blocking=true") ~> Route.seal(routes(creds)) ~> check { status should be(InternalServerError) diff --git a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActivationsApiTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActivationsApiTests.scala index a24339718d3..5e266ddb4c2 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActivationsApiTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActivationsApiTests.scala @@ -18,7 +18,6 @@ package org.apache.openwhisk.core.controller.test import java.time.{Clock, Instant} - import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import akka.http.scaladsl.model.StatusCodes._ import akka.http.scaladsl.server.Route @@ -31,7 +30,7 @@ import org.apache.openwhisk.core.entitlement.Collection import org.apache.openwhisk.core.entity._ import org.apache.openwhisk.core.entity.size._ import org.apache.openwhisk.http.{ErrorResponse, Messages} -import org.apache.openwhisk.core.database.UserContext +import org.apache.openwhisk.core.database.{ActivationStoreLevel, UserContext} /** * Tests Activations API. @@ -95,7 +94,8 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi end = Instant.now) }.toList try { - (notExpectedActivations ++ activations).foreach(storeActivation(_, false, false, context)) + (notExpectedActivations ++ activations).foreach( + storeActivation(_, false, ActivationStoreLevel.STORE_ALWAYS, ActivationStoreLevel.STORE_ALWAYS, context)) waitOnListActivationsInNamespace(namespace, 2, context) org.apache.openwhisk.utils.retry { @@ -179,7 +179,8 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi }.toList try { - (notExpectedActivations ++ activations).foreach(storeActivation(_, false, false, context)) + (notExpectedActivations ++ activations).foreach( + storeActivation(_, false, ActivationStoreLevel.STORE_ALWAYS, ActivationStoreLevel.STORE_ALWAYS, context)) waitOnListActivationsInNamespace(namespace, 2, context) checkCount("", 2) @@ -254,7 +255,8 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi end = now.plusSeconds(30))) // should match try { - (notExpectedActivations ++ activations).foreach(storeActivation(_, false, false, context)) + (notExpectedActivations ++ activations).foreach( + storeActivation(_, false, ActivationStoreLevel.STORE_ALWAYS, ActivationStoreLevel.STORE_ALWAYS, context)) waitOnListActivationsInNamespace(namespace, activations.length, context) { // get between two time stamps @@ -363,7 +365,8 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi annotations = Parameters("path", s"${namespace.asString}/pkg/xyz")) }.toList try { - (notExpectedActivations ++ activations ++ activationsInPackage).foreach(storeActivation(_, false, false, context)) + (notExpectedActivations ++ activations ++ activationsInPackage).foreach( + storeActivation(_, false, ActivationStoreLevel.STORE_ALWAYS, ActivationStoreLevel.STORE_ALWAYS, context)) waitOnListActivationsMatchingName(namespace, EntityPath("xyz"), activations.length, context) waitOnListActivationsMatchingName( namespace, @@ -479,7 +482,8 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi }.toList try { - activations.foreach(storeActivation(_, false, false, context)) + activations.foreach( + storeActivation(_, false, ActivationStoreLevel.STORE_ALWAYS, ActivationStoreLevel.STORE_ALWAYS, context)) waitOnListActivationsInNamespace(namespace, activations.size, context) Get(s"$collectionPath?skip=1") ~> Route.seal(routes(creds)) ~> check { @@ -503,7 +507,8 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi }.toList try { - activations.foreach(storeActivation(_, false, false, context)) + activations.foreach( + storeActivation(_, false, ActivationStoreLevel.STORE_ALWAYS, ActivationStoreLevel.STORE_ALWAYS, context)) waitOnListActivationsInNamespace(namespace, activations.size, context) Get(s"$collectionPath?limit=1") ~> Route.seal(routes(creds)) ~> check { @@ -533,7 +538,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi start = Instant.now, end = Instant.now) try { - storeActivation(activation, false, false, context) + storeActivation(activation, false, ActivationStoreLevel.STORE_ALWAYS, ActivationStoreLevel.STORE_ALWAYS, context) Get(s"$collectionPath/${activation.activationId.asString}") ~> Route.seal(routes(creds)) ~> check { status should be(OK) @@ -570,7 +575,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi start = Instant.now, end = Instant.now) try { - storeActivation(activation, false, false, context) + storeActivation(activation, false, ActivationStoreLevel.STORE_ALWAYS, ActivationStoreLevel.STORE_ALWAYS, context) Get(s"$collectionPath/${activation.activationId.asString}/result") ~> Route.seal(routes(creds)) ~> check { status should be(OK) @@ -583,7 +588,26 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi } //// GET /activations/id/result when db store is disabled - it should "return activation empty when db store is disabled" in { + it should "return activation empty when db store is set to failures for successful blocking" in { + implicit val tid = transid() + val activation = + WhiskActivation( + namespace, + aname(), + creds.subject, + ActivationId.generate(), + start = Instant.now, + end = Instant.now) + + storeActivation(activation, true, ActivationStoreLevel.STORE_FAILURES, ActivationStoreLevel.STORE_ALWAYS, context) + + Get(s"$collectionPath/${activation.activationId.asString}/result") ~> Route.seal(routes(creds)) ~> check { + status should be(NotFound) + } + } + + //// GET /activations/id/result when db store is disabled + it should "return activation empty when db store is set to failures for non-blocking" in { implicit val tid = transid() val activation = WhiskActivation( @@ -594,7 +618,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi start = Instant.now, end = Instant.now) - storeActivation(activation, true, true, context) + storeActivation(activation, false, ActivationStoreLevel.STORE_ALWAYS, ActivationStoreLevel.STORE_FAILURES, context) Get(s"$collectionPath/${activation.activationId.asString}/result") ~> Route.seal(routes(creds)) ~> check { status should be(NotFound) @@ -602,7 +626,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi } //// GET /activations/id/result when store is disabled and activation is not blocking - it should "get activation result by id when db store is disabled and activation is not blocking" in { + it should "get activation result by id when db store is disabled for successful blocking and activation is not blocking" in { implicit val tid = transid() val activation = WhiskActivation( @@ -613,7 +637,12 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi start = Instant.now, end = Instant.now) try { - storeActivation(activation, false, true, context) + storeActivation( + activation, + false, + ActivationStoreLevel.STORE_FAILURES, + ActivationStoreLevel.STORE_ALWAYS, + context) Get(s"$collectionPath/${activation.activationId.asString}/result") ~> Route.seal(routes(creds)) ~> check { status should be(OK) @@ -626,7 +655,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi } //// GET /activations/id/result when store is disabled and activation is unsuccessful - it should "get activation result by id when db store is disabled and activation is unsuccessful" in { + it should "get activation result by id when db store is set to failures and activation is unsuccessful" in { implicit val tid = transid() val activation = WhiskActivation( @@ -638,7 +667,65 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi end = Instant.now, response = ActivationResponse.whiskError("activation error")) try { - storeActivation(activation, true, true, context) + storeActivation( + activation, + true, + ActivationStoreLevel.STORE_FAILURES, + ActivationStoreLevel.STORE_FAILURES, + context) + + Get(s"$collectionPath/${activation.activationId.asString}/result") ~> Route.seal(routes(creds)) ~> check { + status should be(OK) + val response = responseAs[JsObject] + response should be(activation.response.toExtendedJson) + } + } finally { + deleteActivation(ActivationId(activation.docid.asString), context) + } + } + + it should "return activation empty when db store is set to not application failures and activation is application failure" in { + implicit val tid = transid() + val activation = + WhiskActivation( + namespace, + aname(), + creds.subject, + ActivationId.generate(), + start = Instant.now, + end = Instant.now, + response = ActivationResponse.applicationError("activation error")) + + storeActivation( + activation, + true, + ActivationStoreLevel.STORE_FAILURES_NOT_APPLICATION_ERRORS, + ActivationStoreLevel.STORE_FAILURES_NOT_APPLICATION_ERRORS, + context) + + Get(s"$collectionPath/${activation.activationId.asString}/result") ~> Route.seal(routes(creds)) ~> check { + status should be(NotFound) + } + } + + it should "get activation result by id when db store is set to not application failures failures and activation is unsuccessful" in { + implicit val tid = transid() + val activation = + WhiskActivation( + namespace, + aname(), + creds.subject, + ActivationId.generate(), + start = Instant.now, + end = Instant.now, + response = ActivationResponse.whiskError("activation error")) + try { + storeActivation( + activation, + true, + ActivationStoreLevel.STORE_FAILURES_NOT_APPLICATION_ERRORS, + ActivationStoreLevel.STORE_FAILURES_NOT_APPLICATION_ERRORS, + context) Get(s"$collectionPath/${activation.activationId.asString}/result") ~> Route.seal(routes(creds)) ~> check { status should be(OK) @@ -662,7 +749,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi start = Instant.now, end = Instant.now) try { - storeActivation(activation, false, false, context) + storeActivation(activation, false, ActivationStoreLevel.STORE_ALWAYS, ActivationStoreLevel.STORE_ALWAYS, context) Get(s"$collectionPath/${activation.activationId.asString}/logs") ~> Route.seal(routes(creds)) ~> check { status should be(OK) @@ -685,7 +772,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi ActivationId.generate(), start = Instant.now, end = Instant.now) - storeActivation(activation, false, false, context) + storeActivation(activation, false, ActivationStoreLevel.STORE_ALWAYS, ActivationStoreLevel.STORE_ALWAYS, context) try { Get(s"$collectionPath/${activation.activationId.asString}/bogus") ~> Route.seal(routes(creds)) ~> check { @@ -758,7 +845,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi val activation = new BadActivation(namespace, aname(), creds.subject, ActivationId.generate(), Instant.now, Instant.now) - storeActivation(activation, false, false, context) + storeActivation(activation, false, ActivationStoreLevel.STORE_ALWAYS, ActivationStoreLevel.STORE_ALWAYS, context) Get(s"$collectionPath/${activation.activationId}") ~> Route.seal(routes(creds)) ~> check { status should be(InternalServerError) diff --git a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala index 95c527cbc81..13f95721175 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala @@ -34,14 +34,19 @@ import org.apache.openwhisk.core.{FeatureFlags, WhiskConfig} import org.apache.openwhisk.core.connector.ActivationMessage import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider import org.apache.openwhisk.core.controller.{CustomHeaders, RestApiCommons, WhiskServices} -import org.apache.openwhisk.core.database.{ActivationStoreProvider, CacheChangeNotification, DocumentFactory} +import org.apache.openwhisk.core.database.{ + ActivationStoreLevel, + ActivationStoreProvider, + CacheChangeNotification, + DocumentFactory, + UserContext +} import org.apache.openwhisk.core.database.test.DbUtils import org.apache.openwhisk.core.entitlement._ import org.apache.openwhisk.core.entity._ import org.apache.openwhisk.core.entity.test.ExecHelpers import org.apache.openwhisk.core.loadBalancer.LoadBalancer import org.apache.openwhisk.spi.SpiLoader -import org.apache.openwhisk.core.database.UserContext protected trait ControllerTestCommon extends FlatSpec @@ -127,12 +132,15 @@ protected trait ControllerTestCommon def storeActivation( activation: WhiskActivation, isBlockingActivation: Boolean, - disableStore: Boolean, + blockingStoreLevel: ActivationStoreLevel.Value, + nonBlockingStoreLevel: ActivationStoreLevel.Value, context: UserContext)(implicit transid: TransactionId, timeout: Duration = 10 seconds): DocInfo = { - val docFuture = activationStore.storeAfterCheck(activation, isBlockingActivation, Some(disableStore), context)( - transid, - notifier = None, - logging) + val docFuture = activationStore.storeAfterCheck( + activation, + isBlockingActivation, + Some(blockingStoreLevel), + Some(nonBlockingStoreLevel), + context)(transid, notifier = None, logging) val doc = Await.result(docFuture, timeout) assert(doc != null) doc