Skip to content

Adds aggregation across metrics for failed/succeeded and non completed stages #1558

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Mar 10, 2025
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -320,9 +320,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
AccumProfileResults(0, 0, AccumMetaRef.EMPTY_ACCUM_META_REF, 0L, 0L, 0L, 0L)
val emptyNodeNames = Seq.empty[String]
val emptyDiagnosticMetrics = HashMap.empty[String, AccumProfileResults]
// TODO: this has stage attempts. we should handle different attempts
app.stageManager.getAllStages.map { sm =>
// TODO: Should we only consider successful tasks?
app.stageManager.getAllSuccessfulStageAttempts.map { sm =>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For sm.duration, can we output both all stages and successFulStageAttempts? It's quite common to have failed stages for an in-product runs. For duration, we want to see the cost there.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a better way of adding this would be another view which incorporates both failed and successful tasks/stages. Doing that segregation on a metric level does not seem like a nice idea. That would require a another view creation. We can take care of this in another issue/PR.

val tasksInStage = app.taskManager.getTasks(sm.stageInfo.stageId,
sm.stageInfo.attemptNumber())
// count duplicate task attempts
Expand Down Expand Up @@ -358,13 +356,12 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
}

/**
* Aggregates the SparkMetrics by stage. This is an internal method to populate the cached metrics
* Aggregates the SparkMetrics by completed stage information.
* This is an internal method to populate the cached metrics
* to be used by other aggregators.
* @param index AppIndex (used by the profiler tool)
*/
private def aggregateSparkMetricsByStageInternal(index: Int): Unit = {
// TODO: this has stage attempts. we should handle different attempts

// For Photon apps, peak memory and shuffle write time need to be calculated from accumulators
// instead of task metrics.
// Approach:
Expand Down Expand Up @@ -392,7 +389,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
}
}

app.stageManager.getAllStages.foreach { sm =>
app.stageManager.getAllSuccessfulStageAttempts.foreach { sm =>
// TODO: Should we only consider successful tasks?
val tasksInStage = app.taskManager.getTasks(sm.stageInfo.stageId,
sm.stageInfo.attemptNumber())
Expand Down Expand Up @@ -447,7 +444,15 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
perStageRec.swBytesWrittenSum,
perStageRec.swRecordsWrittenSum,
perStageRec.swWriteTimeSum) // converted to milliseconds by the aggregator
stageLevelSparkMetrics(index).put(sm.stageInfo.stageId, stageRow)
// This logic is to handle the case where there are multiple attempts for a stage.
// We check if the StageLevelCache already has a row for the stage.
// If yes, we aggregate the metrics of the new row with the existing row.
// If no, we just store the new row.
val rowToStore = stageLevelSparkMetrics(index)
.get(sm.stageInfo.stageId)
.map(_.aggregateStageProfileMetric(stageRow))
.getOrElse(stageRow)
stageLevelSparkMetrics(index).put(sm.stageInfo.stageId, rowToStore)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,65 @@ case class StageAggTaskMetricsProfileResult(
swRecordsWrittenSum: Long,
swWriteTimeSum: Long // milliseconds
) extends BaseJobStageAggTaskMetricsProfileResult {

/**
* Combines two StageAggTaskMetricsProfileResults for the same stage.
* This method aggregates the metrics from the current instance and the provided `other` instance.
*
* Detailed explanation ->
* 1. A stage can have two successful attempts.
* 2. We store both of those attempt information using the StageManager
* 3. During aggregation, we combine the metrics for a stage at a stageID
* level
* 4. For combining aggregated information for multiple stage attempts, we combine the
* aggregated per attempt information into one using the below method
*
* @param other The StageAggTaskMetricsProfileResult to be combined with the current instance.
* @return A new StageAggTaskMetricsProfileResult with aggregated metrics.
*/
def aggregateStageProfileMetric(
other: StageAggTaskMetricsProfileResult
): StageAggTaskMetricsProfileResult = {
StageAggTaskMetricsProfileResult(
appIndex = this.appIndex,
id = this.id,
numTasks = this.numTasks + other.numTasks,
duration = Option(this.duration.getOrElse(0L) + other.duration.getOrElse(0L)),
diskBytesSpilledSum = this.diskBytesSpilledSum + other.diskBytesSpilledSum,
durationSum = this.durationSum + other.durationSum,
durationMax = Math.max(this.durationMax, other.durationMax),
durationMin = Math.min(this.durationMin, other.durationMin),
durationAvg = (this.durationAvg + other.durationAvg) / 2,
executorCPUTimeSum = this.executorCPUTimeSum + other.executorCPUTimeSum,
executorDeserializeCpuTimeSum = this.executorDeserializeCpuTimeSum +
other.executorDeserializeCpuTimeSum,
executorDeserializeTimeSum = this.executorDeserializeTimeSum +
other.executorDeserializeTimeSum,
executorRunTimeSum = this.executorRunTimeSum + other.executorRunTimeSum,
inputBytesReadSum = this.inputBytesReadSum + other.inputBytesReadSum,
inputRecordsReadSum = this.inputRecordsReadSum + other.inputRecordsReadSum,
jvmGCTimeSum = this.jvmGCTimeSum + other.jvmGCTimeSum,
memoryBytesSpilledSum = this.memoryBytesSpilledSum + other.memoryBytesSpilledSum,
outputBytesWrittenSum = this.outputBytesWrittenSum + other.outputBytesWrittenSum,
outputRecordsWrittenSum = this.outputRecordsWrittenSum + other.outputRecordsWrittenSum,
peakExecutionMemoryMax = Math.max(this.peakExecutionMemoryMax, other.peakExecutionMemoryMax),
resultSerializationTimeSum = this.resultSerializationTimeSum +
other.resultSerializationTimeSum,
resultSizeMax = Math.max(this.resultSizeMax, other.resultSizeMax),
srFetchWaitTimeSum = this.srFetchWaitTimeSum + other.srFetchWaitTimeSum,
srLocalBlocksFetchedSum = this.srLocalBlocksFetchedSum + other.srLocalBlocksFetchedSum,
srRemoteBlocksFetchSum = this.srRemoteBlocksFetchSum + other.srRemoteBlocksFetchSum,
srRemoteBytesReadSum = this.srRemoteBytesReadSum + other.srRemoteBytesReadSum,
srRemoteBytesReadToDiskSum = this.srRemoteBytesReadToDiskSum +
other.srRemoteBytesReadToDiskSum,
srTotalBytesReadSum = this.srTotalBytesReadSum + other.srTotalBytesReadSum,
srcLocalBytesReadSum = this.srcLocalBytesReadSum + other.srcLocalBytesReadSum,
swBytesWrittenSum = this.swBytesWrittenSum + other.swBytesWrittenSum,
swRecordsWrittenSum = this.swRecordsWrittenSum + other.swRecordsWrittenSum,
swWriteTimeSum = this.swWriteTimeSum + other.swWriteTimeSum
)
}

override def idHeader = "stageId"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ abstract class EventProcessorBase[T <: AppBase](app: T) extends SparkListener wi
// Parse task accumulables
for (res <- event.taskInfo.accumulables) {
try {
app.accumManager.addAccToTask(event.stageId, event.taskInfo.taskId, res)
app.accumManager.addAccToTask(event.stageId, res)
} catch {
case NonFatal(e) =>
logWarning("Exception when parsing accumulables on task-completed "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,9 @@ class AccumInfo(val infoRef: AccumMetaRef) {
* attempt information with give no Stats at accumulable level
*
* @param stageId The ID of the stage containing the task
* @param taskId The ID of the completed task
* @param accumulableInfo Accumulator information from the TaskEnd event
*/
def addAccumToTask(stageId: Int, taskId: Long, accumulableInfo: AccumulableInfo): Unit = {
def addAccumToTask(stageId: Int, accumulableInfo: AccumulableInfo): Unit = {
// 1. We first extract the incoming task update value
// 2. Then allocate a new Statistic metric object with min,max as incoming update
// 3. Use count to calculate rolling average
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ class AccumManager {
accumInfoRef.addAccumToStage(stageId, accumulableInfo)
}

def addAccToTask(stageId: Int, taskId: Long, accumulableInfo: AccumulableInfo): Unit = {
def addAccToTask(stageId: Int, accumulableInfo: AccumulableInfo): Unit = {
val accumInfoRef = getOrCreateAccumInfo(accumulableInfo.id, accumulableInfo.name)
accumInfoRef.addAccumToTask(stageId, taskId, accumulableInfo)
accumInfoRef.addAccumToTask(stageId, accumulableInfo)
}

def getAccStageIds(id: Long): Set[Int] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,35 @@ class StageModel private(sInfo: StageInfo) {
ProfileUtils.optionLongMinusOptionLong(stageInfo.completionTime, stageInfo.submissionTime)
}

/**
* Returns true if a stage attempt has failed.
* There can be multiple attempts( retries ) of a stage
* that can fail until the last attempt succeeds.
*
* @return true if a stage attempt has failed.
*/
def hasFailed: Boolean = {
stageInfo.failureReason.isDefined
}

/**
* Returns true if a stage has completed.
* Now a failed stage attempt can still be completed.
* So complete here does not necessarily mean the stage has succeeded.
* Complete here means the stage for which the completionTime is set.
*
* @return True if completionTime is set
*/
def hasCompleted: Boolean = {
stageInfo.completionTime.isDefined
}

/**
* Returns the failure reason if the stage has failed.
* Failure reason being set is the sure shot of a failed stage.
*
* @return the failure reason if the stage has failed, or an empty string otherwise
*/
def getFailureReason: String = {
stageInfo.failureReason.getOrElse("")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,6 +53,22 @@ class StageModelManager extends Logging {
*/
def getAllStages: Iterable[StageModel] = stageIdToInfo.values.flatMap(_.values)

/**
* Retrieves all successful stage attempts that have been created as a result of handling
* StageSubmitted/StageCompleted events. A stage attempt is considered successful if it has
* completed execution and has not failed. This method filters out any stage attempts that
* are marked as failed or incomplete.
* There can be multiple successful attempts for a stage.
*
* @return An Iterable collection of all successful StageModel instances, where each StageModel
* represents a successful stage attempt.
*/
def getAllSuccessfulStageAttempts: Iterable[StageModel] = {
stageIdToInfo.values.flatMap(_.values).filter(stage => {
stage.hasCompleted && !stage.hasFailed
})
}

/**
* Returns all Ids of stage objects created as a result of handling StageSubmitted/StageCompleted.
*
Expand Down