Skip to content

feat(collector): timestamps and offset fields #2431

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 50 additions & 28 deletions collector/benthos/input/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ import (
)

const (
fieldPrometheusURL = "url"
fieldPrometheusQueries = "queries"
fieldPrometheusSchedule = "schedule"
fieldPrometheusURL = "url"
fieldPrometheusQueries = "queries"
fieldPrometheusSchedule = "schedule"
fieldPrometheusQueryOffset = "query_offset"
)

func prometheusInputConfig() *service.ConfigSpec {
Expand All @@ -40,17 +41,21 @@ func prometheusInputConfig() *service.ConfigSpec {
).Description("List of PromQL queries to execute."),
service.NewStringField(fieldPrometheusSchedule).
Description("The cron expression to use for the scrape job.").
Examples("*/30 * * * * *", "@every 30s").
Default("*/30 * * * * *"),
).Example("Basic Configuration", "Collect Prometheus metrics with a scrape interval of 30 seconds.", `
Examples("0 * * * * *", "@every 1m").
Default("0 * * * * *"),
service.NewDurationField(fieldPrometheusQueryOffset).
Description("Indicates how far back in time the scraping should be done to account for delays in metric availability.").
Default("0s"),
).Example("Basic Configuration", "Collect Prometheus metrics with a scrape interval of 1 minute and a scrape offset of 30 seconds to account for delays in metric availability.", `
input:
prometheus:
url: "${PROMETHEUS_URL:http://localhost:9090}"
schedule: "* * * * *"
schedule: "0 * * * * *"
query_offset: "1m"
queries:
- query:
name: "node_cpu_usage"
promql: "sum(rate(node_cpu_seconds_total{mode!='idle'}[1m])) by (instance)"
promql: "sum(increase(node_cpu_seconds_total{mode!='idle'}[1m])) by (instance)"
`)
}

Expand Down Expand Up @@ -78,14 +83,15 @@ type QueryResult struct {
var _ service.BatchInput = (*prometheusInput)(nil)

type prometheusInput struct {
logger *service.Logger
client v1.API
queries []PromQuery
interval time.Duration
schedule string
scheduler gocron.Scheduler
store map[time.Time][]QueryResult
mu sync.Mutex
logger *service.Logger
client v1.API
queries []PromQuery
interval time.Duration
schedule string
queryOffset time.Duration
scheduler gocron.Scheduler
store map[time.Time][]QueryResult
mu sync.Mutex
}

func newPrometheusInput(conf *service.ParsedConfig, logger *service.Logger) (*prometheusInput, error) {
Expand All @@ -99,6 +105,11 @@ func newPrometheusInput(conf *service.ParsedConfig, logger *service.Logger) (*pr
return nil, err
}

queryOffset, err := conf.FieldDuration(fieldPrometheusQueryOffset)
if err != nil {
return nil, err
}

// Parse queries
queriesConf, err := conf.FieldObjectList(fieldPrometheusQueries)
if err != nil {
Expand Down Expand Up @@ -160,28 +171,38 @@ func newPrometheusInput(conf *service.ParsedConfig, logger *service.Logger) (*pr
}

return &prometheusInput{
logger: logger,
client: v1.NewAPI(client),
queries: queries,
interval: interval,
schedule: schedule,
scheduler: scheduler,
store: make(map[time.Time][]QueryResult),
mu: sync.Mutex{},
logger: logger,
client: v1.NewAPI(client),
queries: queries,
interval: interval,
schedule: schedule,
queryOffset: queryOffset,
scheduler: scheduler,
store: make(map[time.Time][]QueryResult),
mu: sync.Mutex{},
}, nil
}

// scrape executes the PromQL queries and stores the results.
func (in *prometheusInput) scrape(ctx context.Context, t time.Time) error {
in.logger.Debugf("executing PromQL queries at %s", t.Format(time.RFC3339))
// Convert time to UTC
t = t.UTC()

// Apply the metrics scrape offset
queryTime := t.Add(-in.queryOffset)

in.logger.Debugf("executing PromQL queries at %s (using query time %s with offset %s)",
t.Format(time.RFC3339),
queryTime.Format(time.RFC3339),
in.queryOffset)

results := make([]QueryResult, 0, len(in.queries))

for _, query := range in.queries {
in.logger.Tracef("executing query: %s", query.PromQL)

// Execute the PromQL query
result, warnings, err := in.client.Query(ctx, query.PromQL, t)
// Execute the PromQL query with the offset applied time
result, warnings, err := in.client.Query(ctx, query.PromQL, queryTime)
if err != nil {
in.logger.Errorf("error executing query %s: %v", query.PromQL, err)
return err
Expand All @@ -203,7 +224,7 @@ func (in *prometheusInput) scrape(ctx context.Context, t time.Time) error {
results = append(results, QueryResult{
Name: query.Name,
Query: query.PromQL,
Timestamp: t,
Timestamp: queryTime,
Values: vector,
})
}
Expand Down Expand Up @@ -266,6 +287,7 @@ func (in *prometheusInput) ReadBatch(ctx context.Context) (service.MessageBatch,
msg := service.NewMessage(encoded)
msg.MetaSet("scrape_time", t.Format(time.RFC3339))
msg.MetaSet("scrape_interval", in.interval.String())
msg.MetaSet("query_offset", in.queryOffset.String())
msg.MetaSet("query_name", result.Name)
batch = append(batch, msg)
}
Expand Down
61 changes: 34 additions & 27 deletions collector/benthos/input/run_ai.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const (
fieldResourceType = "resource_type"
fieldMetrics = "metrics"
fieldSchedule = "schedule"
fieldMetricsScrapeOffset = "metrics_scrape_offset"
fieldMetricsOffset = "metrics_offset"
fieldHTTPConfig = "http"
fieldHTTPTimeout = "timeout"
fieldHTTPRetryCount = "retry_count"
Expand Down Expand Up @@ -67,7 +67,7 @@ func runAIInputConfig() *service.ConfigSpec {
Description("The cron expression to use for the scrape job.").
Examples("*/30 * * * * *", "@every 30s").
Default("*/30 * * * * *"),
service.NewDurationField(fieldMetricsScrapeOffset).
service.NewDurationField(fieldMetricsOffset).
Description("Indicates how far back in time the scraping window should start to account for delays in metric availability.").
Default("0s"),
service.NewObjectField(fieldHTTPConfig,
Expand Down Expand Up @@ -126,16 +126,16 @@ func init() {
var _ service.BatchInput = (*runAIInput)(nil)

type runAIInput struct {
logger *service.Logger
service *runai.Service
resourceType string
metrics []runai.MetricType
interval time.Duration
schedule string
metricsScrapeOffset time.Duration
scheduler gocron.Scheduler
store map[time.Time][]runai.ResourceWithMetrics
mu sync.Mutex
logger *service.Logger
service *runai.Service
resourceType string
metrics []runai.MetricType
interval time.Duration
schedule string
metricsOffset time.Duration
scheduler gocron.Scheduler
store map[time.Time][]runai.ResourceWithMetrics
mu sync.Mutex
}

func newRunAIInput(conf *service.ParsedConfig, logger *service.Logger) (*runAIInput, error) {
Expand Down Expand Up @@ -169,7 +169,7 @@ func newRunAIInput(conf *service.ParsedConfig, logger *service.Logger) (*runAIIn
return nil, err
}

metricsScrapeOffset, err := conf.FieldDuration(fieldMetricsScrapeOffset)
metricsOffset, err := conf.FieldDuration(fieldMetricsOffset)
if err != nil {
return nil, err
}
Expand All @@ -183,8 +183,8 @@ func newRunAIInput(conf *service.ParsedConfig, logger *service.Logger) (*runAIIn
return nil, err
}

// Get current time
now := time.Now()
// Get current time in UTC
now := time.Now().UTC()

// Get next two occurrences
nextRun := cronSchedule.Next(now)
Expand Down Expand Up @@ -230,13 +230,13 @@ func newRunAIInput(conf *service.ParsedConfig, logger *service.Logger) (*runAIIn
}

return &runAIInput{
logger: logger,
service: service,
resourceType: resourceType,
interval: interval,
schedule: schedule,
metricsScrapeOffset: metricsScrapeOffset,
scheduler: scheduler,
logger: logger,
service: service,
resourceType: resourceType,
interval: interval,
schedule: schedule,
metricsOffset: metricsOffset,
scheduler: scheduler,
metrics: lo.Map(metrics, func(metric string, _ int) runai.MetricType {
return runai.MetricType(metric)
}),
Expand All @@ -249,12 +249,15 @@ func newRunAIInput(conf *service.ParsedConfig, logger *service.Logger) (*runAIIn
func (in *runAIInput) scrape(ctx context.Context, t time.Time) error {
in.logger.Debugf("scraping %s metrics between %s and %s", in.resourceType, t.Add(-in.interval).Format(time.RFC3339), t.Format(time.RFC3339))

startTime := t.Add(-in.interval).Add(-in.metricsOffset)
endTime := t.Add(-in.metricsOffset)

switch in.resourceType {
case "workload":
workloadsWithMetrics, err := in.service.GetAllWorkloadWithMetrics(ctx, runai.MeasurementParams{
MetricType: in.metrics,
StartTime: t.Add(-in.interval).Add(-in.metricsScrapeOffset),
EndTime: t.Add(-in.metricsScrapeOffset),
StartTime: startTime,
EndTime: endTime,
})
if err != nil {
return err
Expand All @@ -268,8 +271,8 @@ func (in *runAIInput) scrape(ctx context.Context, t time.Time) error {
case "pod":
podsWithMetrics, err := in.service.GetAllPodWithMetrics(ctx, runai.MeasurementParams{
MetricType: in.metrics,
StartTime: t.Add(-in.interval).Add(-in.metricsScrapeOffset),
EndTime: t.Add(-in.metricsScrapeOffset),
StartTime: startTime,
EndTime: endTime,
})
if err != nil {
return err
Expand All @@ -291,7 +294,8 @@ func (in *runAIInput) Connect(ctx context.Context) error {
gocron.CronJob(in.schedule, true),
gocron.NewTask(
func(ctx context.Context) error {
t := time.Now()
// Get current time in UTC
t := time.Now().UTC()
err := in.scrape(ctx, t)
if err != nil {
in.logger.Errorf("error scraping metrics: %v", err)
Expand Down Expand Up @@ -336,6 +340,9 @@ func (in *runAIInput) ReadBatch(ctx context.Context) (service.MessageBatch, serv
msg := service.NewMessage(encoded)
msg.MetaSet("scrape_time", t.Format(time.RFC3339))
msg.MetaSet("scrape_interval", in.interval.String())
msg.MetaSet("metrics_offset", in.metricsOffset.String())
msg.MetaSet("resource_type", in.resourceType)
msg.MetaSet("metrics_time", resourceWithMetrics.GetMetrics().Timestamp.Format(time.RFC3339))
batch = append(batch, msg)
}

Expand Down
20 changes: 10 additions & 10 deletions collector/benthos/input/runai/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (s *Service) GetWorkloadMetrics(ctx context.Context, workloadID string, par

for _, measurement := range result.Measurements {
if len(measurement.Values) > 0 {
v, err := strconv.ParseFloat(measurement.Values[0].Value, 64)
v, err := strconv.ParseFloat(measurement.Values[len(measurement.Values)-1].Value, 64)
if err != nil {
return m, fmt.Errorf("failed to parse metric value: %w", err)
}
Expand Down Expand Up @@ -152,22 +152,22 @@ func (s *Service) GetAllWorkloadWithMetrics(ctx context.Context, params Measurem
workloadsWithMetrics := make([]WorkloadWithMetrics, len(workloads))
for i, workload := range workloads {
metrics := Metrics{
Timestamp: params.StartTime,
Timestamp: params.StartTime.UTC(),
Values: make(map[MetricType]float64),
}

// We chunk the metric types to not exceed the max number of metrics per request
for _, metricTypes := range lo.Chunk(params.MetricType, 9) {
m, err := s.GetWorkloadMetrics(ctx, workload.ID, MeasurementParams{
MetricType: metricTypes,
StartTime: params.StartTime,
EndTime: params.EndTime,
StartTime: params.StartTime.UTC(),
EndTime: params.EndTime.UTC(),
})
if err != nil {
return nil, err
}

metrics.Timestamp = m.Timestamp
metrics.Timestamp = m.Timestamp.UTC()
for mt, v := range m.Values {
metrics.Values[mt] = v
}
Expand Down Expand Up @@ -240,7 +240,7 @@ func (s *Service) GetPodMetrics(ctx context.Context, workloadID string, podID st

for _, measurement := range result.Measurements {
if len(measurement.Values) > 0 {
v, err := strconv.ParseFloat(measurement.Values[0].Value, 64)
v, err := strconv.ParseFloat(measurement.Values[len(measurement.Values)-1].Value, 64)
if err != nil {
return m, fmt.Errorf("failed to parse metric value: %w", err)
}
Expand Down Expand Up @@ -289,22 +289,22 @@ func (s *Service) GetAllPodWithMetrics(ctx context.Context, params MeasurementPa
podsWithMetrics := make([]PodWithMetrics, len(pods))
for i, pod := range pods {
metrics := Metrics{
Timestamp: params.StartTime,
Timestamp: params.StartTime.UTC(),
Values: make(map[MetricType]float64),
}

// We chunk the metric types to not exceed the max number of metrics per request
for _, metricTypes := range lo.Chunk(params.MetricType, 9) {
m, err := s.GetPodMetrics(ctx, pod.WorkloadID, pod.ID, MeasurementParams{
MetricType: metricTypes,
StartTime: params.StartTime,
EndTime: params.EndTime,
StartTime: params.StartTime.UTC(),
EndTime: params.EndTime.UTC(),
})
if err != nil {
return nil, err
}

metrics.Timestamp = m.Timestamp
metrics.Timestamp = m.Timestamp.UTC()
for mt, v := range m.Values {
metrics.Values[mt] = v
}
Expand Down
2 changes: 1 addition & 1 deletion collector/benthos/input/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (in *scheduleInput) ReadBatch(ctx context.Context) (service.MessageBatch, s

select {
case v := <-in.timer.C:
t = v
t = v.UTC()
default:
return nil, func(context.Context, error) error { return nil }, nil
}
Expand Down