Skip to content

Commit daeadbf

Browse files
bdoyle0182Brendan Doyle
andauthored
Fix missing attachment stuck actions (#5355)
* init * compile * scalafmt * fix test compilation Co-authored-by: Brendan Doyle <[email protected]>
1 parent 8578887 commit daeadbf

File tree

7 files changed

+34
-23
lines changed

7 files changed

+34
-23
lines changed

common/scala/src/main/scala/org/apache/openwhisk/core/database/DocumentFactory.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,8 @@ trait DocumentFactory[W <: DocumentRevisionProvider] extends MultipleReadersSing
164164
db: ArtifactStore[Wsuper],
165165
doc: DocId,
166166
rev: DocRevision = DocRevision.empty,
167-
fromCache: Boolean = cacheEnabled)(implicit transid: TransactionId, mw: Manifest[W]): Future[W] = {
167+
fromCache: Boolean = cacheEnabled,
168+
ignoreMissingAttachment: Boolean = false)(implicit transid: TransactionId, mw: Manifest[W]): Future[W] = {
168169
implicit val logger = db.logging
169170
implicit val ec = db.executionContext
170171
val key = doc.asDocInfo(rev)

common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskAction.scala

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
2121
import java.nio.charset.StandardCharsets.UTF_8
2222
import java.time.Instant
2323
import java.util.Base64
24-
2524
import akka.http.scaladsl.model.ContentTypes
2625

2726
import scala.concurrent.ExecutionContext
@@ -30,9 +29,7 @@ import scala.util.{Failure, Success, Try}
3029
import spray.json._
3130
import spray.json.DefaultJsonProtocol._
3231
import org.apache.openwhisk.common.TransactionId
33-
import org.apache.openwhisk.core.database.ArtifactStore
34-
import org.apache.openwhisk.core.database.DocumentFactory
35-
import org.apache.openwhisk.core.database.CacheChangeNotification
32+
import org.apache.openwhisk.core.database.{ArtifactStore, CacheChangeNotification, DocumentFactory, NoDocumentException}
3633
import org.apache.openwhisk.core.entity.Attachments._
3734
import org.apache.openwhisk.core.entity.types.EntityStore
3835

@@ -421,11 +418,13 @@ object WhiskAction extends DocumentFactory[WhiskAction] with WhiskEntityQueries[
421418
}
422419

423420
// overridden to retrieve attached code
424-
override def get[A >: WhiskAction](
425-
db: ArtifactStore[A],
426-
doc: DocId,
427-
rev: DocRevision = DocRevision.empty,
428-
fromCache: Boolean)(implicit transid: TransactionId, mw: Manifest[WhiskAction]): Future[WhiskAction] = {
421+
override def get[A >: WhiskAction](db: ArtifactStore[A],
422+
doc: DocId,
423+
rev: DocRevision = DocRevision.empty,
424+
fromCache: Boolean,
425+
ignoreMissingAttachment: Boolean = false)(
426+
implicit transid: TransactionId,
427+
mw: Manifest[WhiskAction]): Future[WhiskAction] = {
429428

430429
implicit val ec = db.executionContext
431430

@@ -439,6 +438,9 @@ object WhiskAction extends DocumentFactory[WhiskAction] with WhiskEntityQueries[
439438
val newAction = a.copy(exec = exec.inline(boas.toByteArray))
440439
newAction.revision(a.rev)
441440
newAction
441+
}).recover({
442+
case _: NoDocumentException if ignoreMissingAttachment =>
443+
action
442444
})
443445
}
444446

core/controller/src/main/scala/org/apache/openwhisk/core/controller/ApiUtils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ trait WriteOps extends Directives {
320320
// marker to return an existing doc with status OK rather than conflict if overwrite is false
321321
case class IdentityPut(self: A) extends Throwable
322322

323-
onComplete(factory.get(datastore, docid) flatMap { doc =>
323+
onComplete(factory.get(datastore, docid, ignoreMissingAttachment = overwrite) flatMap { doc =>
324324
if (overwrite) {
325325
logging.debug(this, s"[PUT] entity exists, will try to update '$doc'")
326326
update(doc).map(updatedDoc => (Some(doc), updatedDoc))
@@ -392,7 +392,7 @@ trait WriteOps extends Directives {
392392
format: RootJsonFormat[A],
393393
notifier: Option[CacheChangeNotification],
394394
ma: Manifest[A]) = {
395-
onComplete(factory.get(datastore, docid) flatMap { entity =>
395+
onComplete(factory.get(datastore, docid, ignoreMissingAttachment = true) flatMap { entity =>
396396
confirm(entity) flatMap {
397397
case _ =>
398398
factory.del(datastore, entity.docinfo) map { _ =>

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ class FunctionPullingContainerProxy(
188188
Option[ExecutableWhiskAction]) => Future[Container],
189189
entityStore: ArtifactStore[WhiskEntity],
190190
namespaceBlacklist: NamespaceBlacklist,
191-
get: (ArtifactStore[WhiskEntity], DocId, DocRevision, Boolean) => Future[WhiskAction],
191+
get: (ArtifactStore[WhiskEntity], DocId, DocRevision, Boolean, Boolean) => Future[WhiskAction],
192192
dataManagementService: ActorRef,
193193
clientProxyFactory: (ActorRefFactory,
194194
String,
@@ -786,7 +786,7 @@ class FunctionPullingContainerProxy(
786786
whenUnhandled {
787787
case Event(PingCache, data: WarmData) =>
788788
val actionId = data.action.fullyQualifiedName(false).toDocId.asDocInfo(data.revision)
789-
get(entityStore, actionId.id, actionId.rev, true).map(_ => {
789+
get(entityStore, actionId.id, actionId.rev, true, false).map(_ => {
790790
logging.debug(
791791
this,
792792
s"Refreshed function cache for action ${data.action} from container ${data.container.containerId}.")
@@ -913,7 +913,7 @@ class FunctionPullingContainerProxy(
913913
if (actionid.rev == DocRevision.empty)
914914
logging.warn(this, s"revision was not provided for ${actionid.id}")
915915

916-
get(entityStore, actionid.id, actionid.rev, actionid.rev != DocRevision.empty)
916+
get(entityStore, actionid.id, actionid.rev, actionid.rev != DocRevision.empty, false)
917917
.flatMap { action =>
918918
action.toExecutableWhiskAction match {
919919
case Some(executable) =>
@@ -1264,7 +1264,7 @@ object FunctionPullingContainerProxy {
12641264
Option[ExecutableWhiskAction]) => Future[Container],
12651265
entityStore: ArtifactStore[WhiskEntity],
12661266
namespaceBlacklist: NamespaceBlacklist,
1267-
get: (ArtifactStore[WhiskEntity], DocId, DocRevision, Boolean) => Future[WhiskAction],
1267+
get: (ArtifactStore[WhiskEntity], DocId, DocRevision, Boolean, Boolean) => Future[WhiskAction],
12681268
dataManagementService: ActorRef,
12691269
clientProxyFactory: (ActorRefFactory,
12701270
String,

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,11 @@ case class QueueManagerConfig(maxRetriesToGetQueue: Int, maxSchedulingTime: Fini
6363

6464
class QueueManager(
6565
entityStore: ArtifactStore[WhiskEntity],
66-
getWhiskActionMetaData: (ArtifactStore[WhiskEntity], DocId, DocRevision, Boolean) => Future[WhiskActionMetaData],
66+
getWhiskActionMetaData: (ArtifactStore[WhiskEntity],
67+
DocId,
68+
DocRevision,
69+
Boolean,
70+
Boolean) => Future[WhiskActionMetaData],
6771
etcdClient: EtcdClient,
6872
schedulerEndpoints: SchedulerEndpoints,
6973
schedulerId: SchedulerInstanceId,
@@ -340,7 +344,7 @@ class QueueManager(
340344
private def recoverQueue(msg: ActivationMessage)(implicit transid: TransactionId): Unit = {
341345
val start = transid.started(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER)
342346
logging.info(this, s"Recover a queue for ${msg.action},")
343-
getWhiskActionMetaData(entityStore, msg.action.toDocId, msg.revision, false)
347+
getWhiskActionMetaData(entityStore, msg.action.toDocId, msg.revision, false, false)
344348
.map { actionMetaData: WhiskActionMetaData =>
345349
actionMetaData.toExecutableWhiskAction match {
346350
case Some(_) =>
@@ -370,7 +374,7 @@ class QueueManager(
370374

371375
logging.info(this, s"Create a new queue for ${newAction}, rev: ${msg.revision}")
372376

373-
getWhiskActionMetaData(entityStore, newAction.toDocId, msg.revision, msg.revision != DocRevision.empty)
377+
getWhiskActionMetaData(entityStore, newAction.toDocId, msg.revision, msg.revision != DocRevision.empty, false)
374378
.map { actionMetaData: WhiskActionMetaData =>
375379
actionMetaData.toExecutableWhiskAction match {
376380
// Always use revision got from Database, there can be 2 cases for the actionMetaData.rev
@@ -668,7 +672,11 @@ object QueueManager {
668672

669673
def props(
670674
entityStore: ArtifactStore[WhiskEntity],
671-
getWhiskActionMetaData: (ArtifactStore[WhiskEntity], DocId, DocRevision, Boolean) => Future[WhiskActionMetaData],
675+
getWhiskActionMetaData: (ArtifactStore[WhiskEntity],
676+
DocId,
677+
DocRevision,
678+
Boolean,
679+
Boolean) => Future[WhiskActionMetaData],
672680
etcdClient: EtcdClient,
673681
schedulerEndpoints: SchedulerEndpoints,
674682
schedulerId: SchedulerInstanceId,

tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ class FunctionPullingContainerProxyTests
209209

210210
/** get WhiskAction*/
211211
def getWhiskAction(response: Future[WhiskAction]) = LoggedFunction {
212-
(_: ArtifactStore[WhiskEntity], _: DocId, _: DocRevision, _: Boolean) =>
212+
(_: ArtifactStore[WhiskEntity], _: DocId, _: DocRevision, _: Boolean, _: Boolean) =>
213213
response
214214
}
215215

tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ class QueueManagerTests
154154

155155
/**get WhiskActionMetaData*/
156156
def getWhiskActionMetaData(meta: Future[WhiskActionMetaData]) = LoggedFunction {
157-
(_: ArtifactStore[WhiskEntity], _: DocId, _: DocRevision, _: Boolean) =>
157+
(_: ArtifactStore[WhiskEntity], _: DocId, _: DocRevision, _: Boolean, _: Boolean) =>
158158
meta
159159
}
160160

@@ -496,7 +496,7 @@ class QueueManagerTests
496496
val finalFqn = newFqn.copy(version = Some(SemVer(0, 0, 3)))
497497
val finalRevision = DocRevision("3-test-revision")
498498
// simulate the case that action is updated again while fetch it from database
499-
def newGet(store: ArtifactStore[WhiskEntity], docId: DocId, docRevision: DocRevision, fromCache: Boolean) = {
499+
def newGet(store: ArtifactStore[WhiskEntity], docId: DocId, docRevision: DocRevision, fromCache: Boolean, ignoreMissingAttachment: Boolean) = {
500500
if (docRevision == DocRevision.empty) {
501501
Future(convertToMetaData(action.copy(version = SemVer(0, 0, 3)).toWhiskAction.revision(finalRevision)))
502502
} else

0 commit comments

Comments
 (0)