diff --git a/collector/benthos/input/prometheus.go b/collector/benthos/input/prometheus.go index e715341e1..330961db1 100644 --- a/collector/benthos/input/prometheus.go +++ b/collector/benthos/input/prometheus.go @@ -15,9 +15,10 @@ import ( ) const ( - fieldPrometheusURL = "url" - fieldPrometheusQueries = "queries" - fieldPrometheusSchedule = "schedule" + fieldPrometheusURL = "url" + fieldPrometheusQueries = "queries" + fieldPrometheusSchedule = "schedule" + fieldPrometheusQueryOffset = "query_offset" ) func prometheusInputConfig() *service.ConfigSpec { @@ -40,17 +41,21 @@ func prometheusInputConfig() *service.ConfigSpec { ).Description("List of PromQL queries to execute."), service.NewStringField(fieldPrometheusSchedule). Description("The cron expression to use for the scrape job."). - Examples("*/30 * * * * *", "@every 30s"). - Default("*/30 * * * * *"), - ).Example("Basic Configuration", "Collect Prometheus metrics with a scrape interval of 30 seconds.", ` + Examples("0 * * * * *", "@every 1m"). + Default("0 * * * * *"), + service.NewDurationField(fieldPrometheusQueryOffset). + Description("Indicates how far back in time the scraping should be done to account for delays in metric availability."). + Default("0s"), + ).Example("Basic Configuration", "Collect Prometheus metrics with a scrape interval of 1 minute and a scrape offset of 30 seconds to account for delays in metric availability.", ` input: prometheus: url: "${PROMETHEUS_URL:http://localhost:9090}" - schedule: "* * * * *" + schedule: "0 * * * * *" + query_offset: "1m" queries: - query: name: "node_cpu_usage" - promql: "sum(rate(node_cpu_seconds_total{mode!='idle'}[1m])) by (instance)" + promql: "sum(increase(node_cpu_seconds_total{mode!='idle'}[1m])) by (instance)" `) } @@ -78,14 +83,15 @@ type QueryResult struct { var _ service.BatchInput = (*prometheusInput)(nil) type prometheusInput struct { - logger *service.Logger - client v1.API - queries []PromQuery - interval time.Duration - schedule string - scheduler gocron.Scheduler - store map[time.Time][]QueryResult - mu sync.Mutex + logger *service.Logger + client v1.API + queries []PromQuery + interval time.Duration + schedule string + queryOffset time.Duration + scheduler gocron.Scheduler + store map[time.Time][]QueryResult + mu sync.Mutex } func newPrometheusInput(conf *service.ParsedConfig, logger *service.Logger) (*prometheusInput, error) { @@ -99,6 +105,11 @@ func newPrometheusInput(conf *service.ParsedConfig, logger *service.Logger) (*pr return nil, err } + queryOffset, err := conf.FieldDuration(fieldPrometheusQueryOffset) + if err != nil { + return nil, err + } + // Parse queries queriesConf, err := conf.FieldObjectList(fieldPrometheusQueries) if err != nil { @@ -160,28 +171,38 @@ func newPrometheusInput(conf *service.ParsedConfig, logger *service.Logger) (*pr } return &prometheusInput{ - logger: logger, - client: v1.NewAPI(client), - queries: queries, - interval: interval, - schedule: schedule, - scheduler: scheduler, - store: make(map[time.Time][]QueryResult), - mu: sync.Mutex{}, + logger: logger, + client: v1.NewAPI(client), + queries: queries, + interval: interval, + schedule: schedule, + queryOffset: queryOffset, + scheduler: scheduler, + store: make(map[time.Time][]QueryResult), + mu: sync.Mutex{}, }, nil } // scrape executes the PromQL queries and stores the results. func (in *prometheusInput) scrape(ctx context.Context, t time.Time) error { - in.logger.Debugf("executing PromQL queries at %s", t.Format(time.RFC3339)) + // Convert time to UTC + t = t.UTC() + + // Apply the metrics scrape offset + queryTime := t.Add(-in.queryOffset) + + in.logger.Debugf("executing PromQL queries at %s (using query time %s with offset %s)", + t.Format(time.RFC3339), + queryTime.Format(time.RFC3339), + in.queryOffset) results := make([]QueryResult, 0, len(in.queries)) for _, query := range in.queries { in.logger.Tracef("executing query: %s", query.PromQL) - // Execute the PromQL query - result, warnings, err := in.client.Query(ctx, query.PromQL, t) + // Execute the PromQL query with the offset applied time + result, warnings, err := in.client.Query(ctx, query.PromQL, queryTime) if err != nil { in.logger.Errorf("error executing query %s: %v", query.PromQL, err) return err @@ -203,7 +224,7 @@ func (in *prometheusInput) scrape(ctx context.Context, t time.Time) error { results = append(results, QueryResult{ Name: query.Name, Query: query.PromQL, - Timestamp: t, + Timestamp: queryTime, Values: vector, }) } @@ -266,6 +287,7 @@ func (in *prometheusInput) ReadBatch(ctx context.Context) (service.MessageBatch, msg := service.NewMessage(encoded) msg.MetaSet("scrape_time", t.Format(time.RFC3339)) msg.MetaSet("scrape_interval", in.interval.String()) + msg.MetaSet("query_offset", in.queryOffset.String()) msg.MetaSet("query_name", result.Name) batch = append(batch, msg) } diff --git a/collector/benthos/input/run_ai.go b/collector/benthos/input/run_ai.go index b9422ef22..173d2029b 100644 --- a/collector/benthos/input/run_ai.go +++ b/collector/benthos/input/run_ai.go @@ -23,7 +23,7 @@ const ( fieldResourceType = "resource_type" fieldMetrics = "metrics" fieldSchedule = "schedule" - fieldMetricsScrapeOffset = "metrics_scrape_offset" + fieldMetricsOffset = "metrics_offset" fieldHTTPConfig = "http" fieldHTTPTimeout = "timeout" fieldHTTPRetryCount = "retry_count" @@ -67,7 +67,7 @@ func runAIInputConfig() *service.ConfigSpec { Description("The cron expression to use for the scrape job."). Examples("*/30 * * * * *", "@every 30s"). Default("*/30 * * * * *"), - service.NewDurationField(fieldMetricsScrapeOffset). + service.NewDurationField(fieldMetricsOffset). Description("Indicates how far back in time the scraping window should start to account for delays in metric availability."). Default("0s"), service.NewObjectField(fieldHTTPConfig, @@ -126,16 +126,16 @@ func init() { var _ service.BatchInput = (*runAIInput)(nil) type runAIInput struct { - logger *service.Logger - service *runai.Service - resourceType string - metrics []runai.MetricType - interval time.Duration - schedule string - metricsScrapeOffset time.Duration - scheduler gocron.Scheduler - store map[time.Time][]runai.ResourceWithMetrics - mu sync.Mutex + logger *service.Logger + service *runai.Service + resourceType string + metrics []runai.MetricType + interval time.Duration + schedule string + metricsOffset time.Duration + scheduler gocron.Scheduler + store map[time.Time][]runai.ResourceWithMetrics + mu sync.Mutex } func newRunAIInput(conf *service.ParsedConfig, logger *service.Logger) (*runAIInput, error) { @@ -169,7 +169,7 @@ func newRunAIInput(conf *service.ParsedConfig, logger *service.Logger) (*runAIIn return nil, err } - metricsScrapeOffset, err := conf.FieldDuration(fieldMetricsScrapeOffset) + metricsOffset, err := conf.FieldDuration(fieldMetricsOffset) if err != nil { return nil, err } @@ -183,8 +183,8 @@ func newRunAIInput(conf *service.ParsedConfig, logger *service.Logger) (*runAIIn return nil, err } - // Get current time - now := time.Now() + // Get current time in UTC + now := time.Now().UTC() // Get next two occurrences nextRun := cronSchedule.Next(now) @@ -230,13 +230,13 @@ func newRunAIInput(conf *service.ParsedConfig, logger *service.Logger) (*runAIIn } return &runAIInput{ - logger: logger, - service: service, - resourceType: resourceType, - interval: interval, - schedule: schedule, - metricsScrapeOffset: metricsScrapeOffset, - scheduler: scheduler, + logger: logger, + service: service, + resourceType: resourceType, + interval: interval, + schedule: schedule, + metricsOffset: metricsOffset, + scheduler: scheduler, metrics: lo.Map(metrics, func(metric string, _ int) runai.MetricType { return runai.MetricType(metric) }), @@ -249,12 +249,15 @@ func newRunAIInput(conf *service.ParsedConfig, logger *service.Logger) (*runAIIn func (in *runAIInput) scrape(ctx context.Context, t time.Time) error { in.logger.Debugf("scraping %s metrics between %s and %s", in.resourceType, t.Add(-in.interval).Format(time.RFC3339), t.Format(time.RFC3339)) + startTime := t.Add(-in.interval).Add(-in.metricsOffset) + endTime := t.Add(-in.metricsOffset) + switch in.resourceType { case "workload": workloadsWithMetrics, err := in.service.GetAllWorkloadWithMetrics(ctx, runai.MeasurementParams{ MetricType: in.metrics, - StartTime: t.Add(-in.interval).Add(-in.metricsScrapeOffset), - EndTime: t.Add(-in.metricsScrapeOffset), + StartTime: startTime, + EndTime: endTime, }) if err != nil { return err @@ -268,8 +271,8 @@ func (in *runAIInput) scrape(ctx context.Context, t time.Time) error { case "pod": podsWithMetrics, err := in.service.GetAllPodWithMetrics(ctx, runai.MeasurementParams{ MetricType: in.metrics, - StartTime: t.Add(-in.interval).Add(-in.metricsScrapeOffset), - EndTime: t.Add(-in.metricsScrapeOffset), + StartTime: startTime, + EndTime: endTime, }) if err != nil { return err @@ -291,7 +294,8 @@ func (in *runAIInput) Connect(ctx context.Context) error { gocron.CronJob(in.schedule, true), gocron.NewTask( func(ctx context.Context) error { - t := time.Now() + // Get current time in UTC + t := time.Now().UTC() err := in.scrape(ctx, t) if err != nil { in.logger.Errorf("error scraping metrics: %v", err) @@ -336,6 +340,9 @@ func (in *runAIInput) ReadBatch(ctx context.Context) (service.MessageBatch, serv msg := service.NewMessage(encoded) msg.MetaSet("scrape_time", t.Format(time.RFC3339)) msg.MetaSet("scrape_interval", in.interval.String()) + msg.MetaSet("metrics_offset", in.metricsOffset.String()) + msg.MetaSet("resource_type", in.resourceType) + msg.MetaSet("metrics_time", resourceWithMetrics.GetMetrics().Timestamp.Format(time.RFC3339)) batch = append(batch, msg) } diff --git a/collector/benthos/input/runai/metrics.go b/collector/benthos/input/runai/metrics.go index d2c7e6b34..88454a887 100644 --- a/collector/benthos/input/runai/metrics.go +++ b/collector/benthos/input/runai/metrics.go @@ -103,7 +103,7 @@ func (s *Service) GetWorkloadMetrics(ctx context.Context, workloadID string, par for _, measurement := range result.Measurements { if len(measurement.Values) > 0 { - v, err := strconv.ParseFloat(measurement.Values[0].Value, 64) + v, err := strconv.ParseFloat(measurement.Values[len(measurement.Values)-1].Value, 64) if err != nil { return m, fmt.Errorf("failed to parse metric value: %w", err) } @@ -152,7 +152,7 @@ func (s *Service) GetAllWorkloadWithMetrics(ctx context.Context, params Measurem workloadsWithMetrics := make([]WorkloadWithMetrics, len(workloads)) for i, workload := range workloads { metrics := Metrics{ - Timestamp: params.StartTime, + Timestamp: params.StartTime.UTC(), Values: make(map[MetricType]float64), } @@ -160,14 +160,14 @@ func (s *Service) GetAllWorkloadWithMetrics(ctx context.Context, params Measurem for _, metricTypes := range lo.Chunk(params.MetricType, 9) { m, err := s.GetWorkloadMetrics(ctx, workload.ID, MeasurementParams{ MetricType: metricTypes, - StartTime: params.StartTime, - EndTime: params.EndTime, + StartTime: params.StartTime.UTC(), + EndTime: params.EndTime.UTC(), }) if err != nil { return nil, err } - metrics.Timestamp = m.Timestamp + metrics.Timestamp = m.Timestamp.UTC() for mt, v := range m.Values { metrics.Values[mt] = v } @@ -240,7 +240,7 @@ func (s *Service) GetPodMetrics(ctx context.Context, workloadID string, podID st for _, measurement := range result.Measurements { if len(measurement.Values) > 0 { - v, err := strconv.ParseFloat(measurement.Values[0].Value, 64) + v, err := strconv.ParseFloat(measurement.Values[len(measurement.Values)-1].Value, 64) if err != nil { return m, fmt.Errorf("failed to parse metric value: %w", err) } @@ -289,7 +289,7 @@ func (s *Service) GetAllPodWithMetrics(ctx context.Context, params MeasurementPa podsWithMetrics := make([]PodWithMetrics, len(pods)) for i, pod := range pods { metrics := Metrics{ - Timestamp: params.StartTime, + Timestamp: params.StartTime.UTC(), Values: make(map[MetricType]float64), } @@ -297,14 +297,14 @@ func (s *Service) GetAllPodWithMetrics(ctx context.Context, params MeasurementPa for _, metricTypes := range lo.Chunk(params.MetricType, 9) { m, err := s.GetPodMetrics(ctx, pod.WorkloadID, pod.ID, MeasurementParams{ MetricType: metricTypes, - StartTime: params.StartTime, - EndTime: params.EndTime, + StartTime: params.StartTime.UTC(), + EndTime: params.EndTime.UTC(), }) if err != nil { return nil, err } - metrics.Timestamp = m.Timestamp + metrics.Timestamp = m.Timestamp.UTC() for mt, v := range m.Values { metrics.Values[mt] = v } diff --git a/collector/benthos/input/schedule.go b/collector/benthos/input/schedule.go index cd54d0485..3321c26b1 100644 --- a/collector/benthos/input/schedule.go +++ b/collector/benthos/input/schedule.go @@ -66,7 +66,7 @@ func (in *scheduleInput) ReadBatch(ctx context.Context) (service.MessageBatch, s select { case v := <-in.timer.C: - t = v + t = v.UTC() default: return nil, func(context.Context, error) error { return nil }, nil }