Skip to content

add system config options for success / failure levels to write blocking / non-blocking activations to db #5169

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions common/scala/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -438,10 +438,19 @@ whisk {
# uniqueName + displayName 253 (max pod name length in Kube)
serdes-overhead = 6068 // 3034 bytes of metadata * 2 for extra headroom

# DEPRECATED, use store-blocking-result-level
# Disables database store for blocking + successful activations
# invocations made with `X-OW-EXTRA-LOGGING: on` header, will force the activation to be stored
disable-store-result = false

# Result level to store in db for blocking activations (STORE_ALWAYS, STORE_FAILURES, STORE_FAILURES_NOT_APPLICATION_ERRORS)
# invocations made with `X-OW-EXTRA-LOGGING: on` header, will force the activation to be stored
store-blocking-result-level = "STORE_ALWAYS"

# Result level to store in db for blocking activations (STORE_ALWAYS, STORE_FAILURES, STORE_FAILURES_NOT_APPLICATION_ERRORS)
# invocations made with `X-OW-EXTRA-LOGGING: on` header, will force the activation to be stored
store-non-blocking-result-level = "STORE_ALWAYS"

# Enable metadata logging of activations not stored in the database
unstored-logs-enabled = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,10 @@ object ConfigKeys {
val sharedPackageExecuteOnly = s"whisk.shared-packages-execute-only"
val swaggerUi = "whisk.swagger-ui"

/* DEPRECATED: disableStoreResult is deprecated for storeBlockingResultLevel */
val disableStoreResult = s"$activation.disable-store-result"
val storeBlockingResultLevel = s"$activation.store-blocking-result-level"
val storeNonBlockingResultLevel = s"$activation.store-non-blocking-result-level"
val unstoredLogsEnabled = s"$activation.unstored-logs-enabled"

val apacheClientConfig = "whisk.apache-client"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.openwhisk.core.database

import java.time.Instant

import akka.actor.ActorSystem
import akka.http.scaladsl.model.HttpRequest
import spray.json.JsObject
Expand All @@ -33,32 +32,43 @@ import scala.concurrent.Future
case class UserContext(user: Identity, request: HttpRequest = HttpRequest())

trait ActivationStore {

/* DEPRECATED: disableStoreResult config is now deprecated replaced with blocking activation store level */
protected val disableStoreResultConfig = loadConfigOrThrow[Boolean](ConfigKeys.disableStoreResult)
protected val storeBlockingResultLevelConfig = {
val configValue = ActivationStoreLevel.valueOf(loadConfigOrThrow[String](ConfigKeys.storeBlockingResultLevel))
if (disableStoreResultConfig && configValue == ActivationStoreLevel.STORE_ALWAYS) ActivationStoreLevel.STORE_FAILURES
else configValue
}
protected val storeNonBlockingResultLevelConfig =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this also be in the try and ignored/set to false if it fails?
this lgtm in general, just a question of how long you want to tolerate the deprecated flag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can discuss a date to remove it since we're not really following consistent releases and I'll send an email announcing the plan to remove it.

I don't think it needs to be moved since the existing behavior is to fail if it doesn't exist which maybe was unnecessary originally and we don't need to keep it long

ActivationStoreLevel.valueOf(loadConfigOrThrow[String](ConfigKeys.storeNonBlockingResultLevel))
protected val unstoredLogsEnabledConfig = loadConfigOrThrow[Boolean](ConfigKeys.unstoredLogsEnabled)

/**
* Checks if an activation should be stored in database and stores it.
*
* @param activation activation to store
* @param isBlockingActivation is activation blocking
* @param blockingStoreLevel do not store activation if successful and blocking
* @param nonBlockingStoreLevel do not store activation if successful and non-blocking
* @param context user and request context
* @param transid transaction ID for request
* @param notifier cache change notifier
* @return Future containing DocInfo related to stored activation
*/
def storeAfterCheck(activation: WhiskActivation,
isBlockingActivation: Boolean,
disableStore: Option[Boolean],
blockingStoreLevel: Option[ActivationStoreLevel.Value],
nonBlockingStoreLevel: Option[ActivationStoreLevel.Value],
context: UserContext)(implicit transid: TransactionId,
notifier: Option[CacheChangeNotification],
logging: Logging): Future[DocInfo] = {
if (context.user.limits.storeActivations.getOrElse(true) &&
shouldStoreActivation(
activation.response.isSuccess,
activation.response,
isBlockingActivation,
transid.meta.extraLogging,
disableStore.getOrElse(disableStoreResultConfig))) {
blockingStoreLevel.getOrElse(storeBlockingResultLevelConfig),
nonBlockingStoreLevel.getOrElse(storeNonBlockingResultLevelConfig))) {

store(activation, context)
} else {
Expand Down Expand Up @@ -183,17 +193,29 @@ trait ActivationStore {
* - an activation in debug mode
* - activation stores is not disabled via a configuration parameter
*
* @param isSuccess is successful activation
* @param activationResponse to check
* @param isBlocking is blocking activation
* @param debugMode is logging header set to "on" for the invocation
* @param disableStore is disable store configured
* @param blockingStoreLevel level of activation status to store for blocking invocations
* @param nonBlockingStoreLevel level of activation status to store for blocking invocations
* @return Should the activation be stored to the database
*/
private def shouldStoreActivation(isSuccess: Boolean,
private def shouldStoreActivation(activationResponse: ActivationResponse,
isBlocking: Boolean,
debugMode: Boolean,
disableStore: Boolean): Boolean = {
!isSuccess || !isBlocking || debugMode || !disableStore
blockingStoreLevel: ActivationStoreLevel.Value,
nonBlockingStoreLevel: ActivationStoreLevel.Value): Boolean = {
def shouldStoreOnLevel(storageLevel: ActivationStoreLevel.Value): Boolean = {
storageLevel match {
case ActivationStoreLevel.STORE_ALWAYS => true
case ActivationStoreLevel.STORE_FAILURES => !activationResponse.isSuccess
case ActivationStoreLevel.STORE_FAILURES_NOT_APPLICATION_ERRORS =>
activationResponse.isContainerError || activationResponse.isWhiskError
}
}

debugMode || (isBlocking && shouldStoreOnLevel(blockingStoreLevel)) || (!isBlocking && shouldStoreOnLevel(
nonBlockingStoreLevel))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.core.database

object ActivationStoreLevel extends Enumeration {
type ActivationStoreLevel = Value
val STORE_ALWAYS, STORE_FAILURES, STORE_FAILURES_NOT_APPLICATION_ERRORS = Value

def valueOf(value: String): Value = {
values
.find(_.toString == value.toUpperCase())
.getOrElse(throw new IllegalArgumentException(s"Invalid log level: $value"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ trait WhiskTriggersApi extends WhiskCollectionAPI {
triggerActivation
}
.map { activation =>
activationStore.storeAfterCheck(activation, false, None, context)
activationStore.storeAfterCheck(activation, false, None, None, context)
}

respondWithActivationIdHeader(triggerActivationId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,10 @@ protected[actions] trait PrimitiveActions {
}
}

activationStore.storeAfterCheck(activation, blockingComposition, None, context)(transid, notifier = None, logging)
activationStore.storeAfterCheck(activation, blockingComposition, None, None, context)(
transid,
notifier = None,
logging)

activation
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ protected[actions] trait SequenceActions {
}
}

activationStore.storeAfterCheck(seqActivation, blockingSequence, None, context)(
activationStore.storeAfterCheck(seqActivation, blockingSequence, None, None, context)(
transid,
notifier = None,
logging)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class FPCInvokerReactive(config: WhiskConfig,
/** Stores an activation in the database. */
private val store = (tid: TransactionId, activation: WhiskActivation, isBlocking: Boolean, context: UserContext) => {
implicit val transid: TransactionId = tid
activationStore.storeAfterCheck(activation, isBlocking, None, context)(tid, notifier = None, logging)
activationStore.storeAfterCheck(activation, isBlocking, None, None, context)(tid, notifier = None, logging)
}

private def healthActivationClientFactory(f: ActorRefFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class InvokerReactive(
/** Stores an activation in the database. */
private val store = (tid: TransactionId, activation: WhiskActivation, isBlocking: Boolean, context: UserContext) => {
implicit val transid: TransactionId = tid
activationStore.storeAfterCheck(activation, isBlocking, None, context)(tid, notifier = None, logging)
activationStore.storeAfterCheck(activation, isBlocking, None, None, context)(tid, notifier = None, logging)
}

/** Creates a ContainerProxy Actor when being called. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
package org.apache.openwhisk.core.controller.test

import java.time.Instant

import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport.sprayJsonUnmarshaller
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.model.headers.RawHeader
import akka.http.scaladsl.server.Route
import org.apache.openwhisk.core.controller.WhiskActionsApi
import org.apache.openwhisk.core.controller.actions.ControllerActivationConfig
import org.apache.openwhisk.core.database.UserContext
import org.apache.openwhisk.core.database.{ActivationStoreLevel, UserContext}
import org.apache.openwhisk.core.entity._
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
Expand Down Expand Up @@ -79,7 +78,7 @@ class ActionsApiWithDbPollingTests extends ControllerTestCommon with WhiskAction
// storing the activation in the db will allow the db polling to retrieve it
// the test harness makes sure the activation id observed by the test matches
// the one generated by the api handler
storeActivation(activation, false, false, context)
storeActivation(activation, false, ActivationStoreLevel.STORE_ALWAYS, ActivationStoreLevel.STORE_ALWAYS, context)
try {
Post(s"$collectionPath/${action.name}?blocking=true") ~> Route.seal(routes(creds)) ~> check {
status should be(OK)
Expand Down Expand Up @@ -114,7 +113,7 @@ class ActionsApiWithDbPollingTests extends ControllerTestCommon with WhiskAction
// storing the activation in the db will allow the db polling to retrieve it
// the test harness makes sure the activation id observed by the test matches
// the one generated by the api handler
storeActivation(activation, false, false, context)
storeActivation(activation, false, ActivationStoreLevel.STORE_ALWAYS, ActivationStoreLevel.STORE_ALWAYS, context)
try {
Post(s"$collectionPath/${action.name}?blocking=true") ~> Route.seal(routes(creds)) ~> check {
status should be(InternalServerError)
Expand Down
Loading