Skip to content

add system config options for success / failure levels to write blocking / non-blocking activations to db #5169

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions common/scala/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -438,10 +438,19 @@ whisk {
# uniqueName + displayName 253 (max pod name length in Kube)
serdes-overhead = 6068 // 3034 bytes of metadata * 2 for extra headroom

# DEPRECATED, use store-blocking-result-level
# Disables database store for blocking + successful activations
# invocations made with `X-OW-EXTRA-LOGGING: on` header, will force the activation to be stored
disable-store-result = false

# Result level to store in db for blocking activations (STORE_ALWAYS, STORE_FAILURES, STORE_FAILURES_NOT_APPLICATION_ERRORS)
# invocations made with `X-OW-EXTRA-LOGGING: on` header, will force the activation to be stored
store-blocking-result-level = "STORE_ALWAYS"

# Result level to store in db for non-blocking activations (STORE_ALWAYS, STORE_FAILURES, STORE_FAILURES_NOT_APPLICATION_ERRORS)
# invocations made with `X-OW-EXTRA-LOGGING: on` header, will force the activation to be stored
store-non-blocking-result-level = "STORE_ALWAYS"

# Enable metadata logging of activations not stored in the database
unstored-logs-enabled = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,10 @@ object ConfigKeys {
val sharedPackageExecuteOnly = s"whisk.shared-packages-execute-only"
val swaggerUi = "whisk.swagger-ui"

/* DEPRECATED: disableStoreResult is deprecated for storeBlockingResultLevel */
val disableStoreResult = s"$activation.disable-store-result"
val storeBlockingResultLevel = s"$activation.store-blocking-result-level"
val storeNonBlockingResultLevel = s"$activation.store-non-blocking-result-level"
val unstoredLogsEnabled = s"$activation.unstored-logs-enabled"

val apacheClientConfig = "whisk.apache-client"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.openwhisk.core.database

import java.time.Instant

import akka.actor.ActorSystem
import akka.http.scaladsl.model.HttpRequest
import spray.json.JsObject
Expand All @@ -33,32 +32,52 @@ import scala.concurrent.Future
case class UserContext(user: Identity, request: HttpRequest = HttpRequest())

trait ActivationStore {
val logging: Logging

/* DEPRECATED: disableStoreResult config is now deprecated replaced with blocking activation store level (storeBlockingResultLevel) */
protected val disableStoreResultConfig = loadConfigOrThrow[Boolean](ConfigKeys.disableStoreResult)
protected val storeBlockingResultLevelConfig = {
try {
ActivationStoreLevel.valueOf(loadConfigOrThrow[String](ConfigKeys.storeBlockingResultLevel))
} catch {
case _: Exception =>
val disableStoreResultConfig = loadConfigOrThrow[Boolean](ConfigKeys.disableStoreResult)
logging.warn(
this,
s"The config ${ConfigKeys.disableStoreResult} being used is deprecated. Please use the replacement config ${ConfigKeys.storeBlockingResultLevel}")
if (disableStoreResultConfig) ActivationStoreLevel.STORE_FAILURES else ActivationStoreLevel.STORE_ALWAYS
}
}
protected val storeNonBlockingResultLevelConfig =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this also be in the try and ignored/set to false if it fails?
this lgtm in general, just a question of how long you want to tolerate the deprecated flag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can discuss a date to remove it since we're not really following consistent releases and I'll send an email announcing the plan to remove it.

I don't think it needs to be moved since the existing behavior is to fail if it doesn't exist which maybe was unnecessary originally and we don't need to keep it long

ActivationStoreLevel.valueOf(loadConfigOrThrow[String](ConfigKeys.storeNonBlockingResultLevel))
protected val unstoredLogsEnabledConfig = loadConfigOrThrow[Boolean](ConfigKeys.unstoredLogsEnabled)

/**
* Checks if an activation should be stored in database and stores it.
*
* @param activation activation to store
* @param isBlockingActivation is activation blocking
* @param blockingStoreLevel do not store activation if successful and blocking
* @param nonBlockingStoreLevel do not store activation if successful and non-blocking
* @param context user and request context
* @param transid transaction ID for request
* @param notifier cache change notifier
* @return Future containing DocInfo related to stored activation
*/
def storeAfterCheck(activation: WhiskActivation,
isBlockingActivation: Boolean,
disableStore: Option[Boolean],
blockingStoreLevel: Option[ActivationStoreLevel.Value],
nonBlockingStoreLevel: Option[ActivationStoreLevel.Value],
context: UserContext)(implicit transid: TransactionId,
notifier: Option[CacheChangeNotification],
logging: Logging): Future[DocInfo] = {
if (context.user.limits.storeActivations.getOrElse(true) &&
shouldStoreActivation(
activation.response.isSuccess,
activation.response,
isBlockingActivation,
transid.meta.extraLogging,
disableStore.getOrElse(disableStoreResultConfig))) {
blockingStoreLevel.getOrElse(storeBlockingResultLevelConfig),
nonBlockingStoreLevel.getOrElse(storeNonBlockingResultLevelConfig))) {

store(activation, context)
} else {
Expand Down Expand Up @@ -183,17 +202,29 @@ trait ActivationStore {
* - an activation in debug mode
* - activation stores is not disabled via a configuration parameter
*
* @param isSuccess is successful activation
* @param activationResponse to check
* @param isBlocking is blocking activation
* @param debugMode is logging header set to "on" for the invocation
* @param disableStore is disable store configured
* @param blockingStoreLevel level of activation status to store for blocking invocations
* @param nonBlockingStoreLevel level of activation status to store for blocking invocations
* @return Should the activation be stored to the database
*/
private def shouldStoreActivation(isSuccess: Boolean,
private def shouldStoreActivation(activationResponse: ActivationResponse,
isBlocking: Boolean,
debugMode: Boolean,
disableStore: Boolean): Boolean = {
!isSuccess || !isBlocking || debugMode || !disableStore
blockingStoreLevel: ActivationStoreLevel.Value,
nonBlockingStoreLevel: ActivationStoreLevel.Value): Boolean = {
def shouldStoreOnLevel(storageLevel: ActivationStoreLevel.Value): Boolean = {
storageLevel match {
case ActivationStoreLevel.STORE_ALWAYS => true
case ActivationStoreLevel.STORE_FAILURES => !activationResponse.isSuccess
case ActivationStoreLevel.STORE_FAILURES_NOT_APPLICATION_ERRORS =>
activationResponse.isContainerError || activationResponse.isWhiskError
}
}

debugMode || (isBlocking && shouldStoreOnLevel(blockingStoreLevel)) || (!isBlocking && shouldStoreOnLevel(
nonBlockingStoreLevel))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.core.database

object ActivationStoreLevel extends Enumeration {
type ActivationStoreLevel = Value
val STORE_ALWAYS, STORE_FAILURES, STORE_FAILURES_NOT_APPLICATION_ERRORS = Value

def valueOf(value: String): Value = {
values
.find(_.toString == value.toUpperCase())
.getOrElse(throw new IllegalArgumentException(s"Invalid log level: $value"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.openwhisk.core.entity._
import scala.concurrent.Future
import scala.util.{Failure, Success}

class ArtifactActivationStore(actorSystem: ActorSystem, logging: Logging) extends ActivationStore {
class ArtifactActivationStore(actorSystem: ActorSystem, override val logging: Logging) extends ActivationStore {

implicit val executionContext = actorSystem.dispatcher

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ case class ElasticSearchActivationStoreConfig(protocol: String,
class ElasticSearchActivationStore(
httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None,
elasticSearchConfig: ElasticSearchActivationStoreConfig,
useBatching: Boolean = false)(implicit actorSystem: ActorSystem, logging: Logging)
useBatching: Boolean = false)(implicit actorSystem: ActorSystem, override val logging: Logging)
extends ActivationStore {

import com.sksamuel.elastic4s.http.ElasticDsl._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,16 @@
package org.apache.openwhisk.core.database.memory

import java.time.Instant

import akka.actor.ActorSystem
import org.apache.openwhisk.common.{Logging, TransactionId, WhiskInstants}
import org.apache.openwhisk.core.database.{
ActivationStore,
ActivationStoreProvider,
CacheChangeNotification,
UserContext
}
import org.apache.openwhisk.common.{Logging, PrintStreamLogging, TransactionId, WhiskInstants}
import org.apache.openwhisk.core.database.{ActivationStore, ActivationStoreProvider, CacheChangeNotification, UserContext}
import org.apache.openwhisk.core.entity.{ActivationId, DocInfo, EntityName, EntityPath, Subject, WhiskActivation}
import spray.json.{JsNumber, JsObject}

import scala.concurrent.Future

object NoopActivationStore extends ActivationStore with WhiskInstants {
override val logging = new PrintStreamLogging()
private val emptyInfo = DocInfo("foo")
private val emptyCount = JsObject("activations" -> JsNumber(0))
private val dummyActivation = WhiskActivation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ trait WhiskTriggersApi extends WhiskCollectionAPI {
triggerActivation
}
.map { activation =>
activationStore.storeAfterCheck(activation, false, None, context)
activationStore.storeAfterCheck(activation, false, None, None, context)
}

respondWithActivationIdHeader(triggerActivationId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,10 @@ protected[actions] trait PrimitiveActions {
}
}

activationStore.storeAfterCheck(activation, blockingComposition, None, context)(transid, notifier = None, logging)
activationStore.storeAfterCheck(activation, blockingComposition, None, None, context)(
transid,
notifier = None,
logging)

activation
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ protected[actions] trait SequenceActions {
}
}

activationStore.storeAfterCheck(seqActivation, blockingSequence, None, context)(
activationStore.storeAfterCheck(seqActivation, blockingSequence, None, None, context)(
transid,
notifier = None,
logging)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,8 @@ class FPCPoolBalancer(config: WhiskConfig,
// and complete the promise with a failure if necessary
activationPromises
.remove(aid)
.foreach(_.tryFailure(new Throwable("Activation entry has timed out, no completion or active ack received yet")))
.foreach(
_.tryFailure(new Throwable("Activation entry has timed out, no completion or active ack received yet")))
}

// Active acks that are received here are strictly from user actions - health actions are not part of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class FPCInvokerReactive(config: WhiskConfig,
/** Stores an activation in the database. */
private val store = (tid: TransactionId, activation: WhiskActivation, isBlocking: Boolean, context: UserContext) => {
implicit val transid: TransactionId = tid
activationStore.storeAfterCheck(activation, isBlocking, None, context)(tid, notifier = None, logging)
activationStore.storeAfterCheck(activation, isBlocking, None, None, context)(tid, notifier = None, logging)
}

private def healthActivationClientFactory(f: ActorRefFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class InvokerReactive(
/** Stores an activation in the database. */
private val store = (tid: TransactionId, activation: WhiskActivation, isBlocking: Boolean, context: UserContext) => {
implicit val transid: TransactionId = tid
activationStore.storeAfterCheck(activation, isBlocking, None, context)(tid, notifier = None, logging)
activationStore.storeAfterCheck(activation, isBlocking, None, None, context)(tid, notifier = None, logging)
}

/** Creates a ContainerProxy Actor when being called. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
package org.apache.openwhisk.core.controller.test

import java.time.Instant

import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport.sprayJsonUnmarshaller
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.model.headers.RawHeader
import akka.http.scaladsl.server.Route
import org.apache.openwhisk.core.controller.WhiskActionsApi
import org.apache.openwhisk.core.controller.actions.ControllerActivationConfig
import org.apache.openwhisk.core.database.UserContext
import org.apache.openwhisk.core.database.{ActivationStoreLevel, UserContext}
import org.apache.openwhisk.core.entity._
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
Expand Down Expand Up @@ -79,7 +78,7 @@ class ActionsApiWithDbPollingTests extends ControllerTestCommon with WhiskAction
// storing the activation in the db will allow the db polling to retrieve it
// the test harness makes sure the activation id observed by the test matches
// the one generated by the api handler
storeActivation(activation, false, false, context)
storeActivation(activation, false, ActivationStoreLevel.STORE_ALWAYS, ActivationStoreLevel.STORE_ALWAYS, context)
try {
Post(s"$collectionPath/${action.name}?blocking=true") ~> Route.seal(routes(creds)) ~> check {
status should be(OK)
Expand Down Expand Up @@ -114,7 +113,7 @@ class ActionsApiWithDbPollingTests extends ControllerTestCommon with WhiskAction
// storing the activation in the db will allow the db polling to retrieve it
// the test harness makes sure the activation id observed by the test matches
// the one generated by the api handler
storeActivation(activation, false, false, context)
storeActivation(activation, false, ActivationStoreLevel.STORE_ALWAYS, ActivationStoreLevel.STORE_ALWAYS, context)
try {
Post(s"$collectionPath/${action.name}?blocking=true") ~> Route.seal(routes(creds)) ~> check {
status should be(InternalServerError)
Expand Down
Loading