-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Support array result for common action and sequence action #5290
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ae9d8d6
1ebc52c
929fe41
3ba9723
c15de8f
b7235eb
6ba278a
a9451f3
c92bce3
d7de6e5
d5b6569
84d220f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,7 +57,7 @@ case class ActivationMessage(override val transid: TransactionId, | |
activationId: ActivationId, | ||
rootControllerIndex: ControllerInstanceId, | ||
blocking: Boolean, | ||
content: Option[JsObject], | ||
content: Option[JsValue], | ||
initArgs: Set[String] = Set.empty, | ||
lockedArgs: Map[String, String] = Map.empty, | ||
cause: Option[ActivationId] = None, | ||
|
@@ -380,9 +380,13 @@ object Activation extends DefaultJsonProtocol { | |
|
||
/** Get "StatusCode" from result response set by action developer * */ | ||
def userDefinedStatusCode(result: Option[JsValue]): Option[Int] = { | ||
val statusCode = JsHelpers | ||
.getFieldPath(result.get.asJsObject, ERROR_FIELD, "statusCode") | ||
.orElse(JsHelpers.getFieldPath(result.get.asJsObject, "statusCode")) | ||
val statusCode: Option[JsValue] = result match { | ||
case Some(JsObject(fields)) => | ||
JsHelpers | ||
.getFieldPath(JsObject(fields), ERROR_FIELD, "statusCode") | ||
.orElse(JsHelpers.getFieldPath(JsObject(fields), "statusCode")) | ||
case _ => None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is for the array result case and there could be no There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, for array result, seems can't get the userDefined statusCode |
||
} | ||
statusCode.map { | ||
case value => Try(value.convertTo[BigInt].intValue).toOption.getOrElse(BadRequest.intValue) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -201,6 +201,19 @@ object AkkaContainerClient { | |
result | ||
} | ||
|
||
/** A helper method to post one single request to a connection. Used for container tests. */ | ||
def postForJsArray(host: String, port: Int, endPoint: String, content: JsValue, timeout: FiniteDuration)( | ||
implicit logging: Logging, | ||
as: ActorSystem, | ||
ec: ExecutionContext, | ||
tid: TransactionId): (Int, Option[JsArray]) = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any reason that we can't just change the result type to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Actually, in order to support test, we can change below method to support return So here, i added another extra method to test return array |
||
val connection = new AkkaContainerClient(host, port, timeout, 1.MB, 1.MB, 1) | ||
val response = executeRequestForJsArray(connection, endPoint, content) | ||
val result = Await.result(response, timeout + 10.seconds) //additional timeout to complete futures | ||
connection.close() | ||
result | ||
} | ||
|
||
/** A helper method to post multiple concurrent requests to a single connection. Used for container tests. */ | ||
def concurrentPost(host: String, port: Int, endPoint: String, contents: Seq[JsValue], timeout: FiniteDuration)( | ||
implicit logging: Logging, | ||
|
@@ -233,4 +246,24 @@ object AkkaContainerClient { | |
|
||
res | ||
} | ||
|
||
private def executeRequestForJsArray(connection: AkkaContainerClient, endpoint: String, content: JsValue)( | ||
implicit logging: Logging, | ||
as: ActorSystem, | ||
ec: ExecutionContext, | ||
tid: TransactionId): Future[(Int, Option[JsArray])] = { | ||
|
||
val res = connection | ||
.post(endpoint, content, true) | ||
.map({ | ||
case Right(r) => (r.statusCode, Try(r.entity.parseJson.convertTo[JsArray]).toOption) | ||
case Left(NoResponseReceived()) => throw new IllegalStateException("no response from container") | ||
case Left(Timeout(_)) => throw new java.util.concurrent.TimeoutException() | ||
case Left(ConnectionError(t: java.net.SocketTimeoutException)) => | ||
throw new java.util.concurrent.TimeoutException() | ||
case Left(ConnectionError(t)) => throw new IllegalStateException(t.getMessage) | ||
}) | ||
|
||
res | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -388,8 +388,8 @@ class ElasticSearchActivationStore( | |
restoreAnnotations(restoreResponse(hit.sourceAsString.parseJson.asJsObject)).convertTo[WhiskActivation] | ||
} | ||
|
||
private def restoreAnnotations(js: JsObject): JsObject = { | ||
val annotations = js.fields | ||
private def restoreAnnotations(js: JsValue): JsObject = { | ||
val annotations = js.asJsObject.fields | ||
.get("annotations") | ||
.map { anno => | ||
Try { | ||
|
@@ -399,10 +399,10 @@ class ElasticSearchActivationStore( | |
}.getOrElse(JsArray.empty) | ||
} | ||
.getOrElse(JsArray.empty) | ||
JsObject(js.fields.updated("annotations", annotations)) | ||
JsObject(js.asJsObject.fields.updated("annotations", annotations)) | ||
} | ||
|
||
private def restoreResponse(js: JsObject): JsObject = { | ||
private def restoreResponse(js: JsObject): JsValue = { | ||
val response = js.fields | ||
.get("response") | ||
.map { res => | ||
|
@@ -412,7 +412,10 @@ class ElasticSearchActivationStore( | |
.get("result") | ||
.map { r => | ||
val JsString(data) = r | ||
data.parseJson.asJsObject | ||
data.parseJson match { | ||
case JsArray(elements) => JsArray(elements) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Support |
||
case _ => data.parseJson.asJsObject | ||
} | ||
} | ||
.getOrElse(JsObject.empty) | ||
JsObject(temp.updated("result", result)) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -111,7 +111,7 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol { | |
* NOTE: the code is application error (since this response could be used as a response for the sequence | ||
* if the payload contains an error) | ||
*/ | ||
protected[core] def payloadPlaceholder(payload: Option[JsObject]) = ActivationResponse(ApplicationError, payload) | ||
protected[core] def payloadPlaceholder(payload: Option[JsValue]) = ActivationResponse(ApplicationError, payload) | ||
|
||
/** | ||
* Class of errors for invoker-container communication. | ||
|
@@ -203,7 +203,7 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol { | |
truncated match { | ||
case None => | ||
val sizeOpt = Option(str).map(_.length) | ||
Try { str.parseJson.asJsObject } match { | ||
Try { str.parseJson } match { | ||
case scala.util.Success(result @ JsObject(fields)) => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So even without There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
// If the response is a JSON object container an error field, accept it as the response error. | ||
val errorOpt = fields.get(ERROR_FIELD) | ||
|
@@ -222,6 +222,17 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol { | |
developerError(errorContent, sizeOpt) | ||
} | ||
|
||
case scala.util.Success(result @ JsArray(_)) => | ||
if (res.okStatus) { | ||
success(Some(result), sizeOpt) | ||
} else { | ||
// Any non-200 code is treated as a container failure. We still need to check whether | ||
// there was a useful error message in there. | ||
val errorContent = invalidRunResponse(str).toJson | ||
//developerErrorWithLog(errorContent, sizeOpt, None) | ||
developerError(errorContent, sizeOpt) | ||
} | ||
|
||
case scala.util.Success(notAnObj) => | ||
// This should affect only blackbox containers, since our own containers should already test for that. | ||
developerError(invalidRunResponse(str), sizeOpt) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -71,7 +71,7 @@ protected[actions] trait SequenceActions { | |
protected[actions] def invokeAction( | ||
user: Identity, | ||
action: WhiskActionMetaData, | ||
payload: Option[JsObject], | ||
payload: Option[JsValue], | ||
waitForResponse: Option[FiniteDuration], | ||
cause: Option[ActivationId])(implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] | ||
|
||
|
@@ -93,7 +93,7 @@ protected[actions] trait SequenceActions { | |
user: Identity, | ||
action: WhiskActionMetaData, | ||
components: Vector[FullyQualifiedEntityName], | ||
payload: Option[JsObject], | ||
payload: Option[JsValue], | ||
waitForOutermostResponse: Option[FiniteDuration], | ||
cause: Option[ActivationId], | ||
topmost: Boolean, | ||
|
@@ -266,7 +266,7 @@ protected[actions] trait SequenceActions { | |
user: Identity, | ||
seqAction: WhiskActionMetaData, | ||
seqActivationId: ActivationId, | ||
inputPayload: Option[JsObject], | ||
inputPayload: Option[JsValue], | ||
components: Vector[FullyQualifiedEntityName], | ||
cause: Option[ActivationId], | ||
atomicActionCnt: Int)(implicit transid: TransactionId): Future[SequenceAccounting] = { | ||
|
@@ -347,7 +347,12 @@ protected[actions] trait SequenceActions { | |
// the accounting no longer needs to hold a reference to it once the action is | ||
// invoked, so previousResponse.getAndSet(null) drops the reference at this point | ||
// which prevents dragging the previous response for the lifetime of the next activation | ||
val inputPayload = accounting.previousResponse.getAndSet(null).result.map(_.asJsObject) | ||
val previousResult = accounting.previousResponse.getAndSet(null).result | ||
val inputPayload: Option[JsValue] = previousResult match { | ||
case Some(JsObject(fields)) => Some(JsObject(fields)) | ||
case Some(JsArray(elements)) => Some(JsArray(elements)) | ||
case _ => None | ||
} | ||
|
||
// invoke the action by calling the right method depending on whether it's an atomic action or a sequence | ||
val futureWhiskActivationTuple = action.toExecutableWhiskAction match { | ||
|
@@ -460,9 +465,10 @@ protected[actions] case class SequenceAccounting(atomicActionCnt: Int, | |
// check conditions on payload that may lead to interrupting the execution of the sequence | ||
// short-circuit the execution of the sequence iff the payload contains an error field | ||
// and is the result of an action return, not the initial payload | ||
val outputPayload = activation.response.result.map(_.asJsObject) | ||
val payloadContent = outputPayload getOrElse JsObject.empty | ||
val errorField = payloadContent.fields.get(ActivationResponse.ERROR_FIELD) | ||
val errorField: Option[JsValue] = activation.response.result match { | ||
case Some(JsObject(fields)) => fields.get(ActivationResponse.ERROR_FIELD) | ||
case _ => None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So users can't define the explicit error response when the result is a JSON array. Sometimes, libraries return an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I am not sure whether i understand correctly, if libraries return an |
||
} | ||
val withinSeqLimit = newCnt <= maxSequenceCnt | ||
|
||
if (withinSeqLimit && errorField.isEmpty) { | ||
|
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Option[JsObject]
Option[JsArray]
, e.g. the first action's JsArray result as the next action's input param.