Skip to content

Commit e7ad9c3

Browse files
josefkarasekJorTurFer
authored andcommitted
[BUG-5922] Report failing ScaledJob triggers in status (kedacore#5916)
Signed-off-by: Josef Karasek <[email protected]>
1 parent b9a02cd commit e7ad9c3

File tree

6 files changed

+52
-23
lines changed

6 files changed

+52
-23
lines changed

CHANGELOG.md

+5
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ Here is an overview of all new **experimental** features:
7777
### Fixes
7878

7979
- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
80+
- **General**: Check for missing CRD references and sample CRs ([#5920](https://github.com/kedacore/keda/issues/5920))
81+
- **General**: Scalers are properly closed after being refreshed ([#5806](https://github.com/kedacore/keda/issues/5806))
82+
- **MongoDB Scaler**: MongoDB url parses correctly `+srv` scheme ([#5760](https://github.com/kedacore/keda/issues/5760))
83+
- **New Relic Scaler**: Fix CVE-2024-6104 in github.com/hashicorp/go-retryablehttp ([#5944](https://github.com/kedacore/keda/issues/5944))
84+
- **ScaledJob**: Fix ScaledJob ignores failing trigger(s) error ([#5922](https://github.com/kedacore/keda/issues/5922))
8085

8186
### Deprecations
8287

pkg/mock/mock_scaling/mock_executor/mock_interface.go

+4-4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/scaling/executor/scale_executor.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ const (
3939

4040
// ScaleExecutor contains methods RequestJobScale and RequestScale
4141
type ScaleExecutor interface {
42-
RequestJobScale(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, isActive bool, scaleTo int64, maxScale int64)
42+
RequestJobScale(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, isActive bool, isError bool, scaleTo int64, maxScale int64)
4343
RequestScale(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, isActive bool, isError bool, options *ScaleExecutorOptions)
4444
}
4545

pkg/scaling/executor/scale_jobs.go

+14-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ const (
3838
defaultFailedJobsHistoryLimit = int32(100)
3939
)
4040

41-
func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, isActive bool, scaleTo int64, maxScale int64) {
41+
func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, isActive, isError bool, scaleTo int64, maxScale int64) {
4242
logger := e.logger.WithValues("scaledJob.Name", scaledJob.Name, "scaledJob.Namespace", scaledJob.Namespace)
4343

4444
runningJobCount := e.getRunningJobCount(ctx, scaledJob)
@@ -65,6 +65,19 @@ func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1al
6565
logger.V(1).Info("No change in activity")
6666
}
6767

68+
if isError {
69+
// some triggers responded with error
70+
// Set ScaledJob.Status.ReadyCondition to Unknown
71+
readyCondition := scaledJob.Status.Conditions.GetReadyCondition()
72+
msg := "Some triggers defined in ScaledJob are not working correctly"
73+
logger.V(1).Info(msg)
74+
if !readyCondition.IsUnknown() {
75+
if err := e.setReadyCondition(ctx, logger, scaledJob, metav1.ConditionUnknown, "PartialTriggerError", msg); err != nil {
76+
logger.Error(err, "error setting ready condition")
77+
}
78+
}
79+
}
80+
6881
condition := scaledJob.Status.Conditions.GetActiveCondition()
6982
if condition.IsUnknown() || condition.IsTrue() != isActive {
7083
if isActive {

pkg/scaling/scale_handler.go

+11-9
Original file line numberDiff line numberDiff line change
@@ -259,8 +259,8 @@ func (h *scaleHandler) checkScalers(ctx context.Context, scalableObject interfac
259259
return
260260
}
261261

262-
isActive, scaleTo, maxScale := h.isScaledJobActive(ctx, obj)
263-
h.scaleExecutor.RequestJobScale(ctx, obj, isActive, scaleTo, maxScale)
262+
isActive, isError, scaleTo, maxScale := h.isScaledJobActive(ctx, obj)
263+
h.scaleExecutor.RequestJobScale(ctx, obj, isActive, isError, scaleTo, maxScale)
264264
}
265265
}
266266

@@ -816,15 +816,16 @@ func (*scaleHandler) getScalerState(ctx context.Context, scaler scalers.Scaler,
816816

817817
// getScaledJobMetrics returns metrics for specified metric name for a ScaledJob identified by its name and namespace.
818818
// It could either query the metric value directly from the scaler or from a cache, that's being stored for the scaler.
819-
func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) []scaledjob.ScalerMetrics {
819+
func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) ([]scaledjob.ScalerMetrics, bool) {
820820
logger := log.WithValues("scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name)
821821

822822
cache, err := h.GetScalersCache(ctx, scaledJob)
823823
metricscollector.RecordScaledJobError(scaledJob.Namespace, scaledJob.Name, err)
824824
if err != nil {
825825
log.Error(err, "error getting scalers cache", "scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name)
826-
return nil
826+
return nil, true
827827
}
828+
var isError bool
828829
var scalersMetrics []scaledjob.ScalerMetrics
829830
scalers, scalerConfigs := cache.GetScalers()
830831
for scalerIndex, scaler := range scalers {
@@ -852,8 +853,9 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav
852853
metricscollector.RecordScalerLatency(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, false, latency)
853854
}
854855
if err != nil {
855-
scalerLogger.V(1).Info("Error getting scaler metrics and activity, but continue", "error", err)
856+
scalerLogger.Error(err, "Error getting scaler metrics and activity, but continue")
856857
cache.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
858+
isError = true
857859
continue
858860
}
859861
if isTriggerActive {
@@ -886,21 +888,21 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav
886888
metricscollector.RecordScalerActive(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, false, isTriggerActive)
887889
}
888890
}
889-
return scalersMetrics
891+
return scalersMetrics, isError
890892
}
891893

892894
// isScaledJobActive returns whether the input ScaledJob:
893895
// is active as the first return value,
894896
// the second and the third return values indicate queueLength and maxValue for scale
895-
func (h *scaleHandler) isScaledJobActive(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) {
897+
func (h *scaleHandler) isScaledJobActive(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) (bool, bool, int64, int64) {
896898
logger := logf.Log.WithName("scalemetrics")
897899

898-
scalersMetrics := h.getScaledJobMetrics(ctx, scaledJob)
900+
scalersMetrics, isError := h.getScaledJobMetrics(ctx, scaledJob)
899901
isActive, queueLength, maxValue, maxFloatValue :=
900902
scaledjob.IsScaledJobActive(scalersMetrics, scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation, scaledJob.MinReplicaCount(), scaledJob.MaxReplicaCount())
901903

902904
logger.V(1).WithValues("scaledJob.Name", scaledJob.Name).Info("Checking if ScaleJob Scalers are active", "isActive", isActive, "maxValue", maxFloatValue, "MultipleScalersCalculation", scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation)
903-
return isActive, queueLength, maxValue
905+
return isActive, isError, queueLength, maxValue
904906
}
905907

906908
// getTrueMetricArray is a help function made for composite scaler to determine

pkg/scaling/scale_handler_test.go

+17-8
Original file line numberDiff line numberDiff line change
@@ -661,19 +661,21 @@ func TestIsScaledJobActive(t *testing.T) {
661661
scalerCachesLock: &sync.RWMutex{},
662662
scaledObjectsMetricCache: metricscache.NewMetricsCache(),
663663
}
664-
isActive, queueLength, maxValue := sh.isScaledJobActive(context.TODO(), scaledJobSingle)
664+
// nosemgrep: context-todo
665+
isActive, isError, queueLength, maxValue := sh.isScaledJobActive(context.TODO(), scaledJobSingle)
665666
assert.Equal(t, true, isActive)
667+
assert.Equal(t, false, isError)
666668
assert.Equal(t, int64(20), queueLength)
667669
assert.Equal(t, int64(10), maxValue)
668670
scalerCache.Close(context.Background())
669671

670672
// Test the valiation
671673
scalerTestDatam := []scalerTestData{
672-
newScalerTestData("s0-queueLength", 100, "max", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 20, 20),
673-
newScalerTestData("queueLength", 100, "min", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 5, 2),
674-
newScalerTestData("messageCount", 100, "avg", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 12, 9),
675-
newScalerTestData("s3-messageCount", 100, "sum", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 35, 27),
676-
newScalerTestData("s10-messageCount", 25, "sum", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 35, 25),
674+
newScalerTestData("s0-queueLength", 100, "max", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, false, 20, 20),
675+
newScalerTestData("queueLength", 100, "min", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, false, 5, 2),
676+
newScalerTestData("messageCount", 100, "avg", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, false, 12, 9),
677+
newScalerTestData("s3-messageCount", 100, "sum", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, false, 35, 27),
678+
newScalerTestData("s10-messageCount", 25, "sum", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, false, 35, 25),
677679
}
678680

679681
for index, scalerTestData := range scalerTestDatam {
@@ -717,9 +719,11 @@ func TestIsScaledJobActive(t *testing.T) {
717719
scaledObjectsMetricCache: metricscache.NewMetricsCache(),
718720
}
719721
fmt.Printf("index: %d", index)
720-
isActive, queueLength, maxValue = sh.isScaledJobActive(context.TODO(), scaledJob)
722+
// nosemgrep: context-todo
723+
isActive, isError, queueLength, maxValue = sh.isScaledJobActive(context.TODO(), scaledJob)
721724
// assert.Equal(t, 5, index)
722725
assert.Equal(t, scalerTestData.ResultIsActive, isActive)
726+
assert.Equal(t, scalerTestData.ResultIsError, isError)
723727
assert.Equal(t, scalerTestData.ResultQueueLength, queueLength)
724728
assert.Equal(t, scalerTestData.ResultMaxValue, maxValue)
725729
scalerCache.Close(context.Background())
@@ -757,8 +761,10 @@ func TestIsScaledJobActiveIfQueueEmptyButMinReplicaCountGreaterZero(t *testing.T
757761
scaledObjectsMetricCache: metricscache.NewMetricsCache(),
758762
}
759763

760-
isActive, queueLength, maxValue := sh.isScaledJobActive(context.TODO(), scaledJobSingle)
764+
// nosemgrep: context-todo
765+
isActive, isError, queueLength, maxValue := sh.isScaledJobActive(context.TODO(), scaledJobSingle)
761766
assert.Equal(t, true, isActive)
767+
assert.Equal(t, false, isError)
762768
assert.Equal(t, int64(0), queueLength)
763769
assert.Equal(t, int64(0), maxValue)
764770
scalerCache.Close(context.Background())
@@ -781,6 +787,7 @@ func newScalerTestData(
781787
scaler4AverageValue int, //nolint:golint,unparam
782788
scaler4IsActive bool, //nolint:golint,unparam
783789
resultIsActive bool, //nolint:golint,unparam
790+
resultIsError bool, //nolint:golint,unparam
784791
resultQueueLength,
785792
resultMaxLength int) scalerTestData {
786793
return scalerTestData{
@@ -800,6 +807,7 @@ func newScalerTestData(
800807
Scaler4AverageValue: int64(scaler4AverageValue),
801808
Scaler4IsActive: scaler4IsActive,
802809
ResultIsActive: resultIsActive,
810+
ResultIsError: resultIsError,
803811
ResultQueueLength: int64(resultQueueLength),
804812
ResultMaxValue: int64(resultMaxLength),
805813
}
@@ -822,6 +830,7 @@ type scalerTestData struct {
822830
Scaler4AverageValue int64
823831
Scaler4IsActive bool
824832
ResultIsActive bool
833+
ResultIsError bool
825834
ResultQueueLength int64
826835
ResultMaxValue int64
827836
MinReplicaCount int32

0 commit comments

Comments
 (0)