Skip to content

Commit 62b8a50

Browse files
authored
Support array result for common action and sequence action (#5290)
* Support array result * Make controller accept json array * Make elasticsearch support json array Couchdb already suports * Make go runtime test cases due to depend on this * Add test case for array result for nodejs runtime * Make sequence action to support array result * Optimize sequence action to support array result * Fix test case for sequence action feature * Add test case for sequence action This test case is just for nodejs * Add extra method runForJsArray for runtime tests * Fix build error * Fix review comment
1 parent d92485c commit 62b8a50

File tree

28 files changed

+342
-108
lines changed

28 files changed

+342
-108
lines changed

common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ case class ActivationMessage(override val transid: TransactionId,
5757
activationId: ActivationId,
5858
rootControllerIndex: ControllerInstanceId,
5959
blocking: Boolean,
60-
content: Option[JsObject],
60+
content: Option[JsValue],
6161
initArgs: Set[String] = Set.empty,
6262
lockedArgs: Map[String, String] = Map.empty,
6363
cause: Option[ActivationId] = None,
@@ -380,9 +380,13 @@ object Activation extends DefaultJsonProtocol {
380380

381381
/** Get "StatusCode" from result response set by action developer * */
382382
def userDefinedStatusCode(result: Option[JsValue]): Option[Int] = {
383-
val statusCode = JsHelpers
384-
.getFieldPath(result.get.asJsObject, ERROR_FIELD, "statusCode")
385-
.orElse(JsHelpers.getFieldPath(result.get.asJsObject, "statusCode"))
383+
val statusCode: Option[JsValue] = result match {
384+
case Some(JsObject(fields)) =>
385+
JsHelpers
386+
.getFieldPath(JsObject(fields), ERROR_FIELD, "statusCode")
387+
.orElse(JsHelpers.getFieldPath(JsObject(fields), "statusCode"))
388+
case _ => None
389+
}
386390
statusCode.map {
387391
case value => Try(value.convertTo[BigInt].intValue).toOption.getOrElse(BadRequest.intValue)
388392
}

common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,19 @@ object AkkaContainerClient {
201201
result
202202
}
203203

204+
/** A helper method to post one single request to a connection. Used for container tests. */
205+
def postForJsArray(host: String, port: Int, endPoint: String, content: JsValue, timeout: FiniteDuration)(
206+
implicit logging: Logging,
207+
as: ActorSystem,
208+
ec: ExecutionContext,
209+
tid: TransactionId): (Int, Option[JsArray]) = {
210+
val connection = new AkkaContainerClient(host, port, timeout, 1.MB, 1.MB, 1)
211+
val response = executeRequestForJsArray(connection, endPoint, content)
212+
val result = Await.result(response, timeout + 10.seconds) //additional timeout to complete futures
213+
connection.close()
214+
result
215+
}
216+
204217
/** A helper method to post multiple concurrent requests to a single connection. Used for container tests. */
205218
def concurrentPost(host: String, port: Int, endPoint: String, contents: Seq[JsValue], timeout: FiniteDuration)(
206219
implicit logging: Logging,
@@ -233,4 +246,24 @@ object AkkaContainerClient {
233246

234247
res
235248
}
249+
250+
private def executeRequestForJsArray(connection: AkkaContainerClient, endpoint: String, content: JsValue)(
251+
implicit logging: Logging,
252+
as: ActorSystem,
253+
ec: ExecutionContext,
254+
tid: TransactionId): Future[(Int, Option[JsArray])] = {
255+
256+
val res = connection
257+
.post(endpoint, content, true)
258+
.map({
259+
case Right(r) => (r.statusCode, Try(r.entity.parseJson.convertTo[JsArray]).toOption)
260+
case Left(NoResponseReceived()) => throw new IllegalStateException("no response from container")
261+
case Left(Timeout(_)) => throw new java.util.concurrent.TimeoutException()
262+
case Left(ConnectionError(t: java.net.SocketTimeoutException)) =>
263+
throw new java.util.concurrent.TimeoutException()
264+
case Left(ConnectionError(t)) => throw new IllegalStateException(t.getMessage)
265+
})
266+
267+
res
268+
}
236269
}

common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import akka.util.ByteString
2626
import pureconfig._
2727
import pureconfig.generic.auto._
2828
import spray.json.DefaultJsonProtocol._
29-
import spray.json.JsObject
29+
import spray.json.{JsObject, JsValue}
3030
import org.apache.openwhisk.common.{Logging, LoggingMarkers, TransactionId}
3131
import org.apache.openwhisk.core.ConfigKeys
3232
import org.apache.openwhisk.core.entity.ActivationResponse.{ContainerConnectionError, ContainerResponse}
@@ -159,7 +159,7 @@ trait Container {
159159
}
160160

161161
/** Runs code in the container. Thread-safe - caller may invoke concurrently for concurrent activation processing. */
162-
def run(parameters: JsObject,
162+
def run(parameters: JsValue,
163163
environment: JsObject,
164164
timeout: FiniteDuration,
165165
maxConcurrent: Int,

common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -388,8 +388,8 @@ class ElasticSearchActivationStore(
388388
restoreAnnotations(restoreResponse(hit.sourceAsString.parseJson.asJsObject)).convertTo[WhiskActivation]
389389
}
390390

391-
private def restoreAnnotations(js: JsObject): JsObject = {
392-
val annotations = js.fields
391+
private def restoreAnnotations(js: JsValue): JsObject = {
392+
val annotations = js.asJsObject.fields
393393
.get("annotations")
394394
.map { anno =>
395395
Try {
@@ -399,10 +399,10 @@ class ElasticSearchActivationStore(
399399
}.getOrElse(JsArray.empty)
400400
}
401401
.getOrElse(JsArray.empty)
402-
JsObject(js.fields.updated("annotations", annotations))
402+
JsObject(js.asJsObject.fields.updated("annotations", annotations))
403403
}
404404

405-
private def restoreResponse(js: JsObject): JsObject = {
405+
private def restoreResponse(js: JsObject): JsValue = {
406406
val response = js.fields
407407
.get("response")
408408
.map { res =>
@@ -412,7 +412,10 @@ class ElasticSearchActivationStore(
412412
.get("result")
413413
.map { r =>
414414
val JsString(data) = r
415-
data.parseJson.asJsObject
415+
data.parseJson match {
416+
case JsArray(elements) => JsArray(elements)
417+
case _ => data.parseJson.asJsObject
418+
}
416419
}
417420
.getOrElse(JsObject.empty)
418421
JsObject(temp.updated("result", result))

common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationResult.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol {
111111
* NOTE: the code is application error (since this response could be used as a response for the sequence
112112
* if the payload contains an error)
113113
*/
114-
protected[core] def payloadPlaceholder(payload: Option[JsObject]) = ActivationResponse(ApplicationError, payload)
114+
protected[core] def payloadPlaceholder(payload: Option[JsValue]) = ActivationResponse(ApplicationError, payload)
115115

116116
/**
117117
* Class of errors for invoker-container communication.
@@ -203,7 +203,7 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol {
203203
truncated match {
204204
case None =>
205205
val sizeOpt = Option(str).map(_.length)
206-
Try { str.parseJson.asJsObject } match {
206+
Try { str.parseJson } match {
207207
case scala.util.Success(result @ JsObject(fields)) =>
208208
// If the response is a JSON object container an error field, accept it as the response error.
209209
val errorOpt = fields.get(ERROR_FIELD)
@@ -222,6 +222,17 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol {
222222
developerError(errorContent, sizeOpt)
223223
}
224224

225+
case scala.util.Success(result @ JsArray(_)) =>
226+
if (res.okStatus) {
227+
success(Some(result), sizeOpt)
228+
} else {
229+
// Any non-200 code is treated as a container failure. We still need to check whether
230+
// there was a useful error message in there.
231+
val errorContent = invalidRunResponse(str).toJson
232+
//developerErrorWithLog(errorContent, sizeOpt, None)
233+
developerError(errorContent, sizeOpt)
234+
}
235+
225236
case scala.util.Success(notAnObj) =>
226237
// This should affect only blackbox containers, since our own containers should already test for that.
227238
developerError(invalidRunResponse(str), sizeOpt)

common/scala/src/main/scala/org/apache/openwhisk/http/ErrorResponse.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ object Messages {
204204
}
205205

206206
def invalidRunResponse(actualResponse: String) = {
207-
"The action did not produce a valid JSON response" + {
207+
"The action did not produce a valid JSON or JSON Array response" + {
208208
Option(actualResponse) filter { _.nonEmpty } map { s =>
209209
s": $s"
210210
} getOrElse "."

core/controller/src/main/scala/org/apache/openwhisk/core/controller/Actions.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -289,8 +289,12 @@ trait WhiskActionsApi extends WhiskCollectionAPI with PostActionActivation with
289289
complete(Accepted, activationId.toJsObject)
290290
}
291291
case Success(Right(activation)) =>
292-
val response = if (result) activation.resultAsJson else activation.toExtendedJson()
293-
292+
val response = activation.response.result match {
293+
case Some(JsArray(elements)) =>
294+
JsArray(elements)
295+
case _ =>
296+
if (result) activation.resultAsJson else activation.toExtendedJson()
297+
}
294298
respondWithActivationIdHeader(activation.activationId) {
295299
if (activation.response.isSuccess) {
296300
complete(OK, response)

core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PostActionActivation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ protected[core] trait PostActionActivation extends PrimitiveActions with Sequenc
4747
protected[controller] def invokeAction(
4848
user: Identity,
4949
action: WhiskActionMetaData,
50-
payload: Option[JsObject],
50+
payload: Option[JsValue],
5151
waitForResponse: Option[FiniteDuration],
5252
cause: Option[ActivationId])(implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = {
5353
action.toExecutableWhiskAction match {

core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ protected[actions] trait PrimitiveActions {
8080
user: Identity,
8181
action: WhiskActionMetaData,
8282
components: Vector[FullyQualifiedEntityName],
83-
payload: Option[JsObject],
83+
payload: Option[JsValue],
8484
waitForOutermostResponse: Option[FiniteDuration],
8585
cause: Option[ActivationId],
8686
topmost: Boolean,
@@ -109,7 +109,7 @@ protected[actions] trait PrimitiveActions {
109109
protected[actions] def invokeSingleAction(
110110
user: Identity,
111111
action: ExecutableWhiskActionMetaData,
112-
payload: Option[JsObject],
112+
payload: Option[JsValue],
113113
waitForResponse: Option[FiniteDuration],
114114
cause: Option[ActivationId])(implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = {
115115

@@ -152,12 +152,16 @@ protected[actions] trait PrimitiveActions {
152152
private def invokeSimpleAction(
153153
user: Identity,
154154
action: ExecutableWhiskActionMetaData,
155-
payload: Option[JsObject],
155+
payload: Option[JsValue],
156156
waitForResponse: Option[FiniteDuration],
157157
cause: Option[ActivationId])(implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = {
158158

159159
// merge package parameters with action (action parameters supersede), then merge in payload
160-
val args = action.parameters merge payload
160+
val args: Option[JsValue] = payload match {
161+
case Some(JsObject(fields)) => action.parameters merge Some(JsObject(fields))
162+
case Some(JsArray(elements)) => Some(JsArray(elements))
163+
case _ => Some(action.parameters.toJsObject)
164+
}
161165
val activationId = activationIdFactory.make()
162166

163167
val startActivation = transid.started(
@@ -169,6 +173,10 @@ protected[actions] trait PrimitiveActions {
169173
val startLoadbalancer =
170174
transid.started(this, LoggingMarkers.CONTROLLER_LOADBALANCER, s"action activation id: ${activationId}")
171175

176+
val keySet = payload match {
177+
case Some(JsObject(fields)) => Some(fields.keySet)
178+
case _ => None
179+
}
172180
val message = ActivationMessage(
173181
transid,
174182
FullyQualifiedEntityName(action.namespace, action.name, Some(action.version), action.binding),
@@ -179,7 +187,7 @@ protected[actions] trait PrimitiveActions {
179187
waitForResponse.isDefined,
180188
args,
181189
action.parameters.initParameters,
182-
action.parameters.lockedParameters(payload.map(_.fields.keySet).getOrElse(Set.empty)),
190+
action.parameters.lockedParameters(keySet.getOrElse(Set.empty)),
183191
cause = cause,
184192
WhiskTracerProvider.tracer.getTraceContext(transid))
185193

@@ -271,7 +279,7 @@ protected[actions] trait PrimitiveActions {
271279
*/
272280
private def invokeComposition(user: Identity,
273281
action: ExecutableWhiskActionMetaData,
274-
payload: Option[JsObject],
282+
payload: Option[JsValue],
275283
waitForResponse: Option[FiniteDuration],
276284
cause: Option[ActivationId],
277285
accounting: Option[CompositionAccounting] = None)(
@@ -319,7 +327,7 @@ protected[actions] trait PrimitiveActions {
319327
* @param parentTid a parent transaction id
320328
*/
321329
private def invokeConductor(user: Identity,
322-
payload: Option[JsObject],
330+
payload: Option[JsValue],
323331
session: Session,
324332
parentTid: TransactionId): Future[ActivationResponse] = {
325333

@@ -330,9 +338,13 @@ protected[actions] trait PrimitiveActions {
330338
Future.successful(ActivationResponse.applicationError(compositionIsTooLong))
331339
} else {
332340
// inject state into payload if any
333-
val params = session.state
334-
.map(state => Some(JsObject(payload.getOrElse(JsObject.empty).fields ++ state.fields)))
335-
.getOrElse(payload)
341+
val params: Option[JsValue] = payload match {
342+
case Some(JsObject(fields)) =>
343+
session.state
344+
.map(state => Some(JsObject(JsObject(fields).fields ++ state.fields)))
345+
.getOrElse(payload)
346+
case _ => None
347+
}
336348

337349
// invoke conductor action
338350
session.accounting.conductors += 1

core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/SequenceActions.scala

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ protected[actions] trait SequenceActions {
7171
protected[actions] def invokeAction(
7272
user: Identity,
7373
action: WhiskActionMetaData,
74-
payload: Option[JsObject],
74+
payload: Option[JsValue],
7575
waitForResponse: Option[FiniteDuration],
7676
cause: Option[ActivationId])(implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]]
7777

@@ -93,7 +93,7 @@ protected[actions] trait SequenceActions {
9393
user: Identity,
9494
action: WhiskActionMetaData,
9595
components: Vector[FullyQualifiedEntityName],
96-
payload: Option[JsObject],
96+
payload: Option[JsValue],
9797
waitForOutermostResponse: Option[FiniteDuration],
9898
cause: Option[ActivationId],
9999
topmost: Boolean,
@@ -266,7 +266,7 @@ protected[actions] trait SequenceActions {
266266
user: Identity,
267267
seqAction: WhiskActionMetaData,
268268
seqActivationId: ActivationId,
269-
inputPayload: Option[JsObject],
269+
inputPayload: Option[JsValue],
270270
components: Vector[FullyQualifiedEntityName],
271271
cause: Option[ActivationId],
272272
atomicActionCnt: Int)(implicit transid: TransactionId): Future[SequenceAccounting] = {
@@ -347,7 +347,12 @@ protected[actions] trait SequenceActions {
347347
// the accounting no longer needs to hold a reference to it once the action is
348348
// invoked, so previousResponse.getAndSet(null) drops the reference at this point
349349
// which prevents dragging the previous response for the lifetime of the next activation
350-
val inputPayload = accounting.previousResponse.getAndSet(null).result.map(_.asJsObject)
350+
val previousResult = accounting.previousResponse.getAndSet(null).result
351+
val inputPayload: Option[JsValue] = previousResult match {
352+
case Some(JsObject(fields)) => Some(JsObject(fields))
353+
case Some(JsArray(elements)) => Some(JsArray(elements))
354+
case _ => None
355+
}
351356

352357
// invoke the action by calling the right method depending on whether it's an atomic action or a sequence
353358
val futureWhiskActivationTuple = action.toExecutableWhiskAction match {
@@ -460,9 +465,10 @@ protected[actions] case class SequenceAccounting(atomicActionCnt: Int,
460465
// check conditions on payload that may lead to interrupting the execution of the sequence
461466
// short-circuit the execution of the sequence iff the payload contains an error field
462467
// and is the result of an action return, not the initial payload
463-
val outputPayload = activation.response.result.map(_.asJsObject)
464-
val payloadContent = outputPayload getOrElse JsObject.empty
465-
val errorField = payloadContent.fields.get(ActivationResponse.ERROR_FIELD)
468+
val errorField: Option[JsValue] = activation.response.result match {
469+
case Some(JsObject(fields)) => fields.get(ActivationResponse.ERROR_FIELD)
470+
case _ => None
471+
}
466472
val withinSeqLimit = newCnt <= maxSequenceCnt
467473

468474
if (withinSeqLimit && errorField.isEmpty) {

0 commit comments

Comments
 (0)