Skip to content

Adds aggregation across metrics for failed/succeeded and non completed stages #1558

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Mar 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -320,9 +320,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
AccumProfileResults(0, 0, AccumMetaRef.EMPTY_ACCUM_META_REF, 0L, 0L, 0L, 0L)
val emptyNodeNames = Seq.empty[String]
val emptyDiagnosticMetrics = HashMap.empty[String, AccumProfileResults]
// TODO: this has stage attempts. we should handle different attempts
app.stageManager.getAllStages.map { sm =>
// TODO: Should we only consider successful tasks?
val tasksInStage = app.taskManager.getTasks(sm.stageInfo.stageId,
sm.stageInfo.attemptNumber())
// count duplicate task attempts
Expand Down Expand Up @@ -358,13 +356,12 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
}

/**
* Aggregates the SparkMetrics by stage. This is an internal method to populate the cached metrics
* Aggregates the SparkMetrics by completed stage information.
* This is an internal method to populate the cached metrics
* to be used by other aggregators.
* @param index AppIndex (used by the profiler tool)
*/
private def aggregateSparkMetricsByStageInternal(index: Int): Unit = {
// TODO: this has stage attempts. we should handle different attempts

// For Photon apps, peak memory and shuffle write time need to be calculated from accumulators
// instead of task metrics.
// Approach:
Expand Down Expand Up @@ -447,7 +444,15 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
perStageRec.swBytesWrittenSum,
perStageRec.swRecordsWrittenSum,
perStageRec.swWriteTimeSum) // converted to milliseconds by the aggregator
stageLevelSparkMetrics(index).put(sm.stageInfo.stageId, stageRow)
// This logic is to handle the case where there are multiple attempts for a stage.
// We check if the StageLevelCache already has a row for the stage.
// If yes, we aggregate the metrics of the new row with the existing row.
// If no, we just store the new row.
val rowToStore = stageLevelSparkMetrics(index)
.get(sm.stageInfo.stageId)
.map(_.aggregateStageProfileMetric(stageRow))
.getOrElse(stageRow)
stageLevelSparkMetrics(index).put(sm.stageInfo.stageId, rowToStore)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,65 @@ case class StageAggTaskMetricsProfileResult(
swRecordsWrittenSum: Long,
swWriteTimeSum: Long // milliseconds
) extends BaseJobStageAggTaskMetricsProfileResult {

/**
* Combines two StageAggTaskMetricsProfileResults for the same stage.
* This method aggregates the metrics from the current instance and the provided `other` instance.
*
* Detailed explanation ->
* 1. A stage can have two successful attempts.
* 2. We store both of those attempt information using the StageManager
* 3. During aggregation, we combine the metrics for a stage at a stageID
* level
* 4. For combining aggregated information for multiple stage attempts, we combine the
* aggregated per attempt information into one using the below method
*
* @param other The StageAggTaskMetricsProfileResult to be combined with the current instance.
* @return A new StageAggTaskMetricsProfileResult with aggregated metrics.
*/
def aggregateStageProfileMetric(
other: StageAggTaskMetricsProfileResult
): StageAggTaskMetricsProfileResult = {
StageAggTaskMetricsProfileResult(
appIndex = this.appIndex,
id = this.id,
numTasks = this.numTasks + other.numTasks,
duration = Option(this.duration.getOrElse(0L) + other.duration.getOrElse(0L)),
diskBytesSpilledSum = this.diskBytesSpilledSum + other.diskBytesSpilledSum,
durationSum = this.durationSum + other.durationSum,
durationMax = Math.max(this.durationMax, other.durationMax),
durationMin = Math.min(this.durationMin, other.durationMin),
durationAvg = (this.durationAvg + other.durationAvg) / 2,
executorCPUTimeSum = this.executorCPUTimeSum + other.executorCPUTimeSum,
executorDeserializeCpuTimeSum = this.executorDeserializeCpuTimeSum +
other.executorDeserializeCpuTimeSum,
executorDeserializeTimeSum = this.executorDeserializeTimeSum +
other.executorDeserializeTimeSum,
executorRunTimeSum = this.executorRunTimeSum + other.executorRunTimeSum,
inputBytesReadSum = this.inputBytesReadSum + other.inputBytesReadSum,
inputRecordsReadSum = this.inputRecordsReadSum + other.inputRecordsReadSum,
jvmGCTimeSum = this.jvmGCTimeSum + other.jvmGCTimeSum,
memoryBytesSpilledSum = this.memoryBytesSpilledSum + other.memoryBytesSpilledSum,
outputBytesWrittenSum = this.outputBytesWrittenSum + other.outputBytesWrittenSum,
outputRecordsWrittenSum = this.outputRecordsWrittenSum + other.outputRecordsWrittenSum,
peakExecutionMemoryMax = Math.max(this.peakExecutionMemoryMax, other.peakExecutionMemoryMax),
resultSerializationTimeSum = this.resultSerializationTimeSum +
other.resultSerializationTimeSum,
resultSizeMax = Math.max(this.resultSizeMax, other.resultSizeMax),
srFetchWaitTimeSum = this.srFetchWaitTimeSum + other.srFetchWaitTimeSum,
srLocalBlocksFetchedSum = this.srLocalBlocksFetchedSum + other.srLocalBlocksFetchedSum,
srRemoteBlocksFetchSum = this.srRemoteBlocksFetchSum + other.srRemoteBlocksFetchSum,
srRemoteBytesReadSum = this.srRemoteBytesReadSum + other.srRemoteBytesReadSum,
srRemoteBytesReadToDiskSum = this.srRemoteBytesReadToDiskSum +
other.srRemoteBytesReadToDiskSum,
srTotalBytesReadSum = this.srTotalBytesReadSum + other.srTotalBytesReadSum,
srcLocalBytesReadSum = this.srcLocalBytesReadSum + other.srcLocalBytesReadSum,
swBytesWrittenSum = this.swBytesWrittenSum + other.swBytesWrittenSum,
swRecordsWrittenSum = this.swRecordsWrittenSum + other.swRecordsWrittenSum,
swWriteTimeSum = this.swWriteTimeSum + other.swWriteTimeSum
)
}

override def idHeader = "stageId"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ abstract class EventProcessorBase[T <: AppBase](app: T) extends SparkListener wi
// Parse task accumulables
for (res <- event.taskInfo.accumulables) {
try {
app.accumManager.addAccToTask(event.stageId, event.taskInfo.taskId, res)
app.accumManager.addAccToTask(event.stageId, res)
} catch {
case NonFatal(e) =>
logWarning("Exception when parsing accumulables on task-completed "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,9 @@ class AccumInfo(val infoRef: AccumMetaRef) {
* attempt information with give no Stats at accumulable level
*
* @param stageId The ID of the stage containing the task
* @param taskId The ID of the completed task
* @param accumulableInfo Accumulator information from the TaskEnd event
*/
def addAccumToTask(stageId: Int, taskId: Long, accumulableInfo: AccumulableInfo): Unit = {
def addAccumToTask(stageId: Int, accumulableInfo: AccumulableInfo): Unit = {
// 1. We first extract the incoming task update value
// 2. Then allocate a new Statistic metric object with min,max as incoming update
// 3. Use count to calculate rolling average
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ class AccumManager {
accumInfoRef.addAccumToStage(stageId, accumulableInfo)
}

def addAccToTask(stageId: Int, taskId: Long, accumulableInfo: AccumulableInfo): Unit = {
def addAccToTask(stageId: Int, accumulableInfo: AccumulableInfo): Unit = {
val accumInfoRef = getOrCreateAccumInfo(accumulableInfo.id, accumulableInfo.name)
accumInfoRef.addAccumToTask(stageId, taskId, accumulableInfo)
accumInfoRef.addAccumToTask(stageId, accumulableInfo)
}

def getAccStageIds(id: Long): Set[Int] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,23 @@ class StageModel private(sInfo: StageInfo) {
ProfileUtils.optionLongMinusOptionLong(stageInfo.completionTime, stageInfo.submissionTime)
}

/**
* Returns true if a stage attempt has failed.
* There can be multiple attempts( retries ) of a stage
* that can fail until the last attempt succeeds.
*
* @return true if a stage attempt has failed.
*/
def hasFailed: Boolean = {
stageInfo.failureReason.isDefined
}

/**
* Returns the failure reason if the stage has failed.
* Failure reason being set is the sure shot of a failed stage.
*
* @return the failure reason if the stage has failed, or an empty string otherwise
*/
def getFailureReason: String = {
stageInfo.failureReason.getOrElse("")
}
Expand Down