Skip to content

Support array result for common action and sequence action #5290

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ case class ActivationMessage(override val transid: TransactionId,
activationId: ActivationId,
rootControllerIndex: ControllerInstanceId,
blocking: Boolean,
content: Option[JsObject],
content: Option[JsValue],
Copy link
Contributor Author

@ningyougang ningyougang Jul 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • For common action to pass parameter as dict, it would use Option[JsObject]
  • For sequence action to pass parameter as array, it would use Option[JsArray], e.g. the first action's JsArray result as the next action's input param.

initArgs: Set[String] = Set.empty,
lockedArgs: Map[String, String] = Map.empty,
cause: Option[ActivationId] = None,
Expand Down Expand Up @@ -380,9 +380,13 @@ object Activation extends DefaultJsonProtocol {

/** Get "StatusCode" from result response set by action developer * */
def userDefinedStatusCode(result: Option[JsValue]): Option[Int] = {
val statusCode = JsHelpers
.getFieldPath(result.get.asJsObject, ERROR_FIELD, "statusCode")
.orElse(JsHelpers.getFieldPath(result.get.asJsObject, "statusCode"))
val statusCode: Option[JsValue] = result match {
case Some(JsObject(fields)) =>
JsHelpers
.getFieldPath(JsObject(fields), ERROR_FIELD, "statusCode")
.orElse(JsHelpers.getFieldPath(JsObject(fields), "statusCode"))
case _ => None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for the array result case and there could be no statusCode field in the array result, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, for array result, seems can't get the userDefined statusCode

}
statusCode.map {
case value => Try(value.convertTo[BigInt].intValue).toOption.getOrElse(BadRequest.intValue)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,19 @@ object AkkaContainerClient {
result
}

/** A helper method to post one single request to a connection. Used for container tests. */
def postForJsArray(host: String, port: Int, endPoint: String, content: JsValue, timeout: FiniteDuration)(
implicit logging: Logging,
as: ActorSystem,
ec: ExecutionContext,
tid: TransactionId): (Int, Option[JsArray]) = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason that we can't just change the result type to (Int, Option[JsValue])?

Copy link
Contributor Author

@ningyougang ningyougang Jul 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • def runForJsArray is added for test case, the return value is (Int, Option[JsArray])
  • call chain here is: def runForJsArray -> private def syncPostForJsArray -> def postForJsArray

Actually, in order to support test, we can change below method to support return (Int, Option[JsValue]) also
image
But if we change the run directly, there would be a lot of changes in openwhisk repo and all runtime codes should change as well.

So here, i added another extra method to test return array
image
This is just for impact the original code as little as possible: #5290 (comment)

val connection = new AkkaContainerClient(host, port, timeout, 1.MB, 1.MB, 1)
val response = executeRequestForJsArray(connection, endPoint, content)
val result = Await.result(response, timeout + 10.seconds) //additional timeout to complete futures
connection.close()
result
}

/** A helper method to post multiple concurrent requests to a single connection. Used for container tests. */
def concurrentPost(host: String, port: Int, endPoint: String, contents: Seq[JsValue], timeout: FiniteDuration)(
implicit logging: Logging,
Expand Down Expand Up @@ -233,4 +246,24 @@ object AkkaContainerClient {

res
}

private def executeRequestForJsArray(connection: AkkaContainerClient, endpoint: String, content: JsValue)(
implicit logging: Logging,
as: ActorSystem,
ec: ExecutionContext,
tid: TransactionId): Future[(Int, Option[JsArray])] = {

val res = connection
.post(endpoint, content, true)
.map({
case Right(r) => (r.statusCode, Try(r.entity.parseJson.convertTo[JsArray]).toOption)
case Left(NoResponseReceived()) => throw new IllegalStateException("no response from container")
case Left(Timeout(_)) => throw new java.util.concurrent.TimeoutException()
case Left(ConnectionError(t: java.net.SocketTimeoutException)) =>
throw new java.util.concurrent.TimeoutException()
case Left(ConnectionError(t)) => throw new IllegalStateException(t.getMessage)
})

res
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import akka.util.ByteString
import pureconfig._
import pureconfig.generic.auto._
import spray.json.DefaultJsonProtocol._
import spray.json.JsObject
import spray.json.{JsObject, JsValue}
import org.apache.openwhisk.common.{Logging, LoggingMarkers, TransactionId}
import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.entity.ActivationResponse.{ContainerConnectionError, ContainerResponse}
Expand Down Expand Up @@ -159,7 +159,7 @@ trait Container {
}

/** Runs code in the container. Thread-safe - caller may invoke concurrently for concurrent activation processing. */
def run(parameters: JsObject,
def run(parameters: JsValue,
environment: JsObject,
timeout: FiniteDuration,
maxConcurrent: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,8 +388,8 @@ class ElasticSearchActivationStore(
restoreAnnotations(restoreResponse(hit.sourceAsString.parseJson.asJsObject)).convertTo[WhiskActivation]
}

private def restoreAnnotations(js: JsObject): JsObject = {
val annotations = js.fields
private def restoreAnnotations(js: JsValue): JsObject = {
val annotations = js.asJsObject.fields
.get("annotations")
.map { anno =>
Try {
Expand All @@ -399,10 +399,10 @@ class ElasticSearchActivationStore(
}.getOrElse(JsArray.empty)
}
.getOrElse(JsArray.empty)
JsObject(js.fields.updated("annotations", annotations))
JsObject(js.asJsObject.fields.updated("annotations", annotations))
}

private def restoreResponse(js: JsObject): JsObject = {
private def restoreResponse(js: JsObject): JsValue = {
val response = js.fields
.get("response")
.map { res =>
Expand All @@ -412,7 +412,10 @@ class ElasticSearchActivationStore(
.get("result")
.map { r =>
val JsString(data) = r
data.parseJson.asJsObject
data.parseJson match {
case JsArray(elements) => JsArray(elements)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Support store the JsArray result to ElasticSearch.

case _ => data.parseJson.asJsObject
}
}
.getOrElse(JsObject.empty)
JsObject(temp.updated("result", result))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol {
* NOTE: the code is application error (since this response could be used as a response for the sequence
* if the payload contains an error)
*/
protected[core] def payloadPlaceholder(payload: Option[JsObject]) = ActivationResponse(ApplicationError, payload)
protected[core] def payloadPlaceholder(payload: Option[JsValue]) = ActivationResponse(ApplicationError, payload)

/**
* Class of errors for invoker-container communication.
Expand Down Expand Up @@ -203,7 +203,7 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol {
truncated match {
case None =>
val sizeOpt = Option(str).map(_.length)
Try { str.parseJson.asJsObject } match {
Try { str.parseJson } match {
case scala.util.Success(result @ JsObject(fields)) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So even without asJsObject, the result would match here?
Then, it was not necessary in the first place?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • If without this pr, i think we can remove .asJsObject.
  • But with this pr, we need to support JsArray, must remove .asJsObject to match below codes
    image

// If the response is a JSON object container an error field, accept it as the response error.
val errorOpt = fields.get(ERROR_FIELD)
Expand All @@ -222,6 +222,17 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol {
developerError(errorContent, sizeOpt)
}

case scala.util.Success(result @ JsArray(_)) =>
if (res.okStatus) {
success(Some(result), sizeOpt)
} else {
// Any non-200 code is treated as a container failure. We still need to check whether
// there was a useful error message in there.
val errorContent = invalidRunResponse(str).toJson
//developerErrorWithLog(errorContent, sizeOpt, None)
developerError(errorContent, sizeOpt)
}

case scala.util.Success(notAnObj) =>
// This should affect only blackbox containers, since our own containers should already test for that.
developerError(invalidRunResponse(str), sizeOpt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ object Messages {
}

def invalidRunResponse(actualResponse: String) = {
"The action did not produce a valid JSON response" + {
"The action did not produce a valid JSON or JSON Array response" + {
Option(actualResponse) filter { _.nonEmpty } map { s =>
s": $s"
} getOrElse "."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,12 @@ trait WhiskActionsApi extends WhiskCollectionAPI with PostActionActivation with
complete(Accepted, activationId.toJsObject)
}
case Success(Right(activation)) =>
val response = if (result) activation.resultAsJson else activation.toExtendedJson()

val response = activation.response.result match {
case Some(JsArray(elements)) =>
JsArray(elements)
case _ =>
if (result) activation.resultAsJson else activation.toExtendedJson()
}
respondWithActivationIdHeader(activation.activationId) {
if (activation.response.isSuccess) {
complete(OK, response)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ protected[core] trait PostActionActivation extends PrimitiveActions with Sequenc
protected[controller] def invokeAction(
user: Identity,
action: WhiskActionMetaData,
payload: Option[JsObject],
payload: Option[JsValue],
waitForResponse: Option[FiniteDuration],
cause: Option[ActivationId])(implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = {
action.toExecutableWhiskAction match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ protected[actions] trait PrimitiveActions {
user: Identity,
action: WhiskActionMetaData,
components: Vector[FullyQualifiedEntityName],
payload: Option[JsObject],
payload: Option[JsValue],
waitForOutermostResponse: Option[FiniteDuration],
cause: Option[ActivationId],
topmost: Boolean,
Expand Down Expand Up @@ -109,7 +109,7 @@ protected[actions] trait PrimitiveActions {
protected[actions] def invokeSingleAction(
user: Identity,
action: ExecutableWhiskActionMetaData,
payload: Option[JsObject],
payload: Option[JsValue],
waitForResponse: Option[FiniteDuration],
cause: Option[ActivationId])(implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = {

Expand Down Expand Up @@ -152,12 +152,16 @@ protected[actions] trait PrimitiveActions {
private def invokeSimpleAction(
user: Identity,
action: ExecutableWhiskActionMetaData,
payload: Option[JsObject],
payload: Option[JsValue],
waitForResponse: Option[FiniteDuration],
cause: Option[ActivationId])(implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = {

// merge package parameters with action (action parameters supersede), then merge in payload
val args = action.parameters merge payload
val args: Option[JsValue] = payload match {
case Some(JsObject(fields)) => action.parameters merge Some(JsObject(fields))
case Some(JsArray(elements)) => Some(JsArray(elements))
case _ => Some(action.parameters.toJsObject)
}
val activationId = activationIdFactory.make()

val startActivation = transid.started(
Expand All @@ -169,6 +173,10 @@ protected[actions] trait PrimitiveActions {
val startLoadbalancer =
transid.started(this, LoggingMarkers.CONTROLLER_LOADBALANCER, s"action activation id: ${activationId}")

val keySet = payload match {
case Some(JsObject(fields)) => Some(fields.keySet)
case _ => None
}
val message = ActivationMessage(
transid,
FullyQualifiedEntityName(action.namespace, action.name, Some(action.version), action.binding),
Expand All @@ -179,7 +187,7 @@ protected[actions] trait PrimitiveActions {
waitForResponse.isDefined,
args,
action.parameters.initParameters,
action.parameters.lockedParameters(payload.map(_.fields.keySet).getOrElse(Set.empty)),
action.parameters.lockedParameters(keySet.getOrElse(Set.empty)),
cause = cause,
WhiskTracerProvider.tracer.getTraceContext(transid))

Expand Down Expand Up @@ -271,7 +279,7 @@ protected[actions] trait PrimitiveActions {
*/
private def invokeComposition(user: Identity,
action: ExecutableWhiskActionMetaData,
payload: Option[JsObject],
payload: Option[JsValue],
waitForResponse: Option[FiniteDuration],
cause: Option[ActivationId],
accounting: Option[CompositionAccounting] = None)(
Expand Down Expand Up @@ -319,7 +327,7 @@ protected[actions] trait PrimitiveActions {
* @param parentTid a parent transaction id
*/
private def invokeConductor(user: Identity,
payload: Option[JsObject],
payload: Option[JsValue],
session: Session,
parentTid: TransactionId): Future[ActivationResponse] = {

Expand All @@ -330,9 +338,13 @@ protected[actions] trait PrimitiveActions {
Future.successful(ActivationResponse.applicationError(compositionIsTooLong))
} else {
// inject state into payload if any
val params = session.state
.map(state => Some(JsObject(payload.getOrElse(JsObject.empty).fields ++ state.fields)))
.getOrElse(payload)
val params: Option[JsValue] = payload match {
case Some(JsObject(fields)) =>
session.state
.map(state => Some(JsObject(JsObject(fields).fields ++ state.fields)))
.getOrElse(payload)
case _ => None
}

// invoke conductor action
session.accounting.conductors += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ protected[actions] trait SequenceActions {
protected[actions] def invokeAction(
user: Identity,
action: WhiskActionMetaData,
payload: Option[JsObject],
payload: Option[JsValue],
waitForResponse: Option[FiniteDuration],
cause: Option[ActivationId])(implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]]

Expand All @@ -93,7 +93,7 @@ protected[actions] trait SequenceActions {
user: Identity,
action: WhiskActionMetaData,
components: Vector[FullyQualifiedEntityName],
payload: Option[JsObject],
payload: Option[JsValue],
waitForOutermostResponse: Option[FiniteDuration],
cause: Option[ActivationId],
topmost: Boolean,
Expand Down Expand Up @@ -266,7 +266,7 @@ protected[actions] trait SequenceActions {
user: Identity,
seqAction: WhiskActionMetaData,
seqActivationId: ActivationId,
inputPayload: Option[JsObject],
inputPayload: Option[JsValue],
components: Vector[FullyQualifiedEntityName],
cause: Option[ActivationId],
atomicActionCnt: Int)(implicit transid: TransactionId): Future[SequenceAccounting] = {
Expand Down Expand Up @@ -347,7 +347,12 @@ protected[actions] trait SequenceActions {
// the accounting no longer needs to hold a reference to it once the action is
// invoked, so previousResponse.getAndSet(null) drops the reference at this point
// which prevents dragging the previous response for the lifetime of the next activation
val inputPayload = accounting.previousResponse.getAndSet(null).result.map(_.asJsObject)
val previousResult = accounting.previousResponse.getAndSet(null).result
val inputPayload: Option[JsValue] = previousResult match {
case Some(JsObject(fields)) => Some(JsObject(fields))
case Some(JsArray(elements)) => Some(JsArray(elements))
case _ => None
}

// invoke the action by calling the right method depending on whether it's an atomic action or a sequence
val futureWhiskActivationTuple = action.toExecutableWhiskAction match {
Expand Down Expand Up @@ -460,9 +465,10 @@ protected[actions] case class SequenceAccounting(atomicActionCnt: Int,
// check conditions on payload that may lead to interrupting the execution of the sequence
// short-circuit the execution of the sequence iff the payload contains an error field
// and is the result of an action return, not the initial payload
val outputPayload = activation.response.result.map(_.asJsObject)
val payloadContent = outputPayload getOrElse JsObject.empty
val errorField = payloadContent.fields.get(ActivationResponse.ERROR_FIELD)
val errorField: Option[JsValue] = activation.response.result match {
case Some(JsObject(fields)) => fields.get(ActivationResponse.ERROR_FIELD)
case _ => None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So users can't define the explicit error response when the result is a JSON array.
But there would be no difference in the JSON-object-based action, right?

Sometimes, libraries return an error filed and OW considers that as an error of an action as well.
Since there would be no behavioral difference in the JSON-object-based actions, there would be no difference in existing semantics, right?

Copy link
Contributor Author

@ningyougang ningyougang Jul 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • So users can't define the explicit error response when the result is a JSON array.
    Yes, can't define the explicit error response for a JSON array,
    but if we want to define the explicit error response, normally, user should return the JSON-object result.
  • But there would be no difference in the JSON-object-based action, right?
    For the JSON-object-based action, have no any bad influences on these actions, have no any change.
  • Regarding
Sometimes, libraries return an error filed and OW considers that as an error of an action as well.
Since there would be no behavioral difference in the JSON-object-based actions, there would be no difference in existing semantics, right?

I am not sure whether i understand correctly, if libraries return an error or user's codes has some error, normally, there would has catch(....) statement in user's code, and in catch statement, user can return the error or some excetion error with JSON-object result.
If user codes run well without any error or exception, can return array result finally.

}
val withinSeqLimit = newCnt <= maxSequenceCnt

if (withinSeqLimit && errorField.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1082,25 +1082,28 @@ object ContainerProxy {
* @param initArgs set of parameters to treat as initialization arguments
* @return A partition of the arguments into an environment variables map and the JsObject argument to the action
*/
def partitionArguments(content: Option[JsObject], initArgs: Set[String]): (Map[String, JsValue], JsObject) = {
def partitionArguments(content: Option[JsValue], initArgs: Set[String]): (Map[String, JsValue], JsValue) = {
content match {
case None => (Map.empty, JsObject.empty)
case Some(js) if initArgs.isEmpty => (Map.empty, js)
case Some(js) =>
val (env, args) = js.fields.partition(k => initArgs.contains(k._1))
case None => (Map.empty, JsObject.empty)
case Some(JsArray(elements)) => (Map.empty, JsArray(elements))
case Some(JsObject(fields)) if initArgs.isEmpty => (Map.empty, JsObject(fields))
case Some(JsObject(fields)) =>
val (env, args) = fields.partition(k => initArgs.contains(k._1))
(env, JsObject(args))
}
}

def unlockArguments(content: Option[JsObject],
def unlockArguments(content: Option[JsValue],
lockedArgs: Map[String, String],
decoder: ParameterEncryption): Option[JsObject] = {
content.map {
case JsObject(fields) =>
JsObject(fields.map {
decoder: ParameterEncryption): Option[JsValue] = {
content match {
case Some(JsObject(fields)) =>
Some(JsObject(fields.map {
case (k, v: JsString) if lockedArgs.contains(k) => (k -> decoder.encryptor(lockedArgs(k)).decrypt(v))
case p => p
})
}))
// keep the original for other type(e.g. JsArray)
case contentValue => contentValue
}
}
}
Expand Down
Loading