Skip to content

Fix missing attachment stuck actions #5355

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ trait DocumentFactory[W <: DocumentRevisionProvider] extends MultipleReadersSing
db: ArtifactStore[Wsuper],
doc: DocId,
rev: DocRevision = DocRevision.empty,
fromCache: Boolean = cacheEnabled)(implicit transid: TransactionId, mw: Manifest[W]): Future[W] = {
fromCache: Boolean = cacheEnabled,
ignoreMissingAttachment: Boolean = false)(implicit transid: TransactionId, mw: Manifest[W]): Future[W] = {
implicit val logger = db.logging
implicit val ec = db.executionContext
val key = doc.asDocInfo(rev)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.nio.charset.StandardCharsets.UTF_8
import java.time.Instant
import java.util.Base64

import akka.http.scaladsl.model.ContentTypes

import scala.concurrent.ExecutionContext
Expand All @@ -30,9 +29,7 @@ import scala.util.{Failure, Success, Try}
import spray.json._
import spray.json.DefaultJsonProtocol._
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.database.ArtifactStore
import org.apache.openwhisk.core.database.DocumentFactory
import org.apache.openwhisk.core.database.CacheChangeNotification
import org.apache.openwhisk.core.database.{ArtifactStore, CacheChangeNotification, DocumentFactory, NoDocumentException}
import org.apache.openwhisk.core.entity.Attachments._
import org.apache.openwhisk.core.entity.types.EntityStore

Expand Down Expand Up @@ -421,11 +418,13 @@ object WhiskAction extends DocumentFactory[WhiskAction] with WhiskEntityQueries[
}

// overridden to retrieve attached code
override def get[A >: WhiskAction](
db: ArtifactStore[A],
doc: DocId,
rev: DocRevision = DocRevision.empty,
fromCache: Boolean)(implicit transid: TransactionId, mw: Manifest[WhiskAction]): Future[WhiskAction] = {
override def get[A >: WhiskAction](db: ArtifactStore[A],
doc: DocId,
rev: DocRevision = DocRevision.empty,
fromCache: Boolean,
ignoreMissingAttachment: Boolean = false)(
implicit transid: TransactionId,
mw: Manifest[WhiskAction]): Future[WhiskAction] = {

implicit val ec = db.executionContext

Expand All @@ -439,6 +438,9 @@ object WhiskAction extends DocumentFactory[WhiskAction] with WhiskEntityQueries[
val newAction = a.copy(exec = exec.inline(boas.toByteArray))
newAction.revision(a.rev)
newAction
}).recover({
case _: NoDocumentException if ignoreMissingAttachment =>
action
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ trait WriteOps extends Directives {
// marker to return an existing doc with status OK rather than conflict if overwrite is false
case class IdentityPut(self: A) extends Throwable

onComplete(factory.get(datastore, docid) flatMap { doc =>
onComplete(factory.get(datastore, docid, ignoreMissingAttachment = overwrite) flatMap { doc =>
if (overwrite) {
logging.debug(this, s"[PUT] entity exists, will try to update '$doc'")
update(doc).map(updatedDoc => (Some(doc), updatedDoc))
Expand Down Expand Up @@ -392,7 +392,7 @@ trait WriteOps extends Directives {
format: RootJsonFormat[A],
notifier: Option[CacheChangeNotification],
ma: Manifest[A]) = {
onComplete(factory.get(datastore, docid) flatMap { entity =>
onComplete(factory.get(datastore, docid, ignoreMissingAttachment = true) flatMap { entity =>
confirm(entity) flatMap {
case _ =>
factory.del(datastore, entity.docinfo) map { _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ class FunctionPullingContainerProxy(
Option[ExecutableWhiskAction]) => Future[Container],
entityStore: ArtifactStore[WhiskEntity],
namespaceBlacklist: NamespaceBlacklist,
get: (ArtifactStore[WhiskEntity], DocId, DocRevision, Boolean) => Future[WhiskAction],
get: (ArtifactStore[WhiskEntity], DocId, DocRevision, Boolean, Boolean) => Future[WhiskAction],
dataManagementService: ActorRef,
clientProxyFactory: (ActorRefFactory,
String,
Expand Down Expand Up @@ -786,7 +786,7 @@ class FunctionPullingContainerProxy(
whenUnhandled {
case Event(PingCache, data: WarmData) =>
val actionId = data.action.fullyQualifiedName(false).toDocId.asDocInfo(data.revision)
get(entityStore, actionId.id, actionId.rev, true).map(_ => {
get(entityStore, actionId.id, actionId.rev, true, false).map(_ => {
logging.debug(
this,
s"Refreshed function cache for action ${data.action} from container ${data.container.containerId}.")
Expand Down Expand Up @@ -913,7 +913,7 @@ class FunctionPullingContainerProxy(
if (actionid.rev == DocRevision.empty)
logging.warn(this, s"revision was not provided for ${actionid.id}")

get(entityStore, actionid.id, actionid.rev, actionid.rev != DocRevision.empty)
get(entityStore, actionid.id, actionid.rev, actionid.rev != DocRevision.empty, false)
.flatMap { action =>
action.toExecutableWhiskAction match {
case Some(executable) =>
Expand Down Expand Up @@ -1264,7 +1264,7 @@ object FunctionPullingContainerProxy {
Option[ExecutableWhiskAction]) => Future[Container],
entityStore: ArtifactStore[WhiskEntity],
namespaceBlacklist: NamespaceBlacklist,
get: (ArtifactStore[WhiskEntity], DocId, DocRevision, Boolean) => Future[WhiskAction],
get: (ArtifactStore[WhiskEntity], DocId, DocRevision, Boolean, Boolean) => Future[WhiskAction],
dataManagementService: ActorRef,
clientProxyFactory: (ActorRefFactory,
String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ case class QueueManagerConfig(maxRetriesToGetQueue: Int, maxSchedulingTime: Fini

class QueueManager(
entityStore: ArtifactStore[WhiskEntity],
getWhiskActionMetaData: (ArtifactStore[WhiskEntity], DocId, DocRevision, Boolean) => Future[WhiskActionMetaData],
getWhiskActionMetaData: (ArtifactStore[WhiskEntity],
DocId,
DocRevision,
Boolean,
Boolean) => Future[WhiskActionMetaData],
etcdClient: EtcdClient,
schedulerEndpoints: SchedulerEndpoints,
schedulerId: SchedulerInstanceId,
Expand Down Expand Up @@ -340,7 +344,7 @@ class QueueManager(
private def recoverQueue(msg: ActivationMessage)(implicit transid: TransactionId): Unit = {
val start = transid.started(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER)
logging.info(this, s"Recover a queue for ${msg.action},")
getWhiskActionMetaData(entityStore, msg.action.toDocId, msg.revision, false)
getWhiskActionMetaData(entityStore, msg.action.toDocId, msg.revision, false, false)
.map { actionMetaData: WhiskActionMetaData =>
actionMetaData.toExecutableWhiskAction match {
case Some(_) =>
Expand Down Expand Up @@ -370,7 +374,7 @@ class QueueManager(

logging.info(this, s"Create a new queue for ${newAction}, rev: ${msg.revision}")

getWhiskActionMetaData(entityStore, newAction.toDocId, msg.revision, msg.revision != DocRevision.empty)
getWhiskActionMetaData(entityStore, newAction.toDocId, msg.revision, msg.revision != DocRevision.empty, false)
.map { actionMetaData: WhiskActionMetaData =>
actionMetaData.toExecutableWhiskAction match {
// Always use revision got from Database, there can be 2 cases for the actionMetaData.rev
Expand Down Expand Up @@ -668,7 +672,11 @@ object QueueManager {

def props(
entityStore: ArtifactStore[WhiskEntity],
getWhiskActionMetaData: (ArtifactStore[WhiskEntity], DocId, DocRevision, Boolean) => Future[WhiskActionMetaData],
getWhiskActionMetaData: (ArtifactStore[WhiskEntity],
DocId,
DocRevision,
Boolean,
Boolean) => Future[WhiskActionMetaData],
etcdClient: EtcdClient,
schedulerEndpoints: SchedulerEndpoints,
schedulerId: SchedulerInstanceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ class FunctionPullingContainerProxyTests

/** get WhiskAction*/
def getWhiskAction(response: Future[WhiskAction]) = LoggedFunction {
(_: ArtifactStore[WhiskEntity], _: DocId, _: DocRevision, _: Boolean) =>
(_: ArtifactStore[WhiskEntity], _: DocId, _: DocRevision, _: Boolean, _: Boolean) =>
response
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class QueueManagerTests

/**get WhiskActionMetaData*/
def getWhiskActionMetaData(meta: Future[WhiskActionMetaData]) = LoggedFunction {
(_: ArtifactStore[WhiskEntity], _: DocId, _: DocRevision, _: Boolean) =>
(_: ArtifactStore[WhiskEntity], _: DocId, _: DocRevision, _: Boolean, _: Boolean) =>
meta
}

Expand Down Expand Up @@ -496,7 +496,7 @@ class QueueManagerTests
val finalFqn = newFqn.copy(version = Some(SemVer(0, 0, 3)))
val finalRevision = DocRevision("3-test-revision")
// simulate the case that action is updated again while fetch it from database
def newGet(store: ArtifactStore[WhiskEntity], docId: DocId, docRevision: DocRevision, fromCache: Boolean) = {
def newGet(store: ArtifactStore[WhiskEntity], docId: DocId, docRevision: DocRevision, fromCache: Boolean, ignoreMissingAttachment: Boolean) = {
if (docRevision == DocRevision.empty) {
Future(convertToMetaData(action.copy(version = SemVer(0, 0, 3)).toWhiskAction.revision(finalRevision)))
} else
Expand Down