Skip to content

Commit 363729d

Browse files
authored
feat(collector): timestamps and offset fields (#2431)
1 parent 59e49c2 commit 363729d

File tree

4 files changed

+95
-66
lines changed

4 files changed

+95
-66
lines changed

collector/benthos/input/prometheus.go

Lines changed: 50 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@ import (
1515
)
1616

1717
const (
18-
fieldPrometheusURL = "url"
19-
fieldPrometheusQueries = "queries"
20-
fieldPrometheusSchedule = "schedule"
18+
fieldPrometheusURL = "url"
19+
fieldPrometheusQueries = "queries"
20+
fieldPrometheusSchedule = "schedule"
21+
fieldPrometheusQueryOffset = "query_offset"
2122
)
2223

2324
func prometheusInputConfig() *service.ConfigSpec {
@@ -40,17 +41,21 @@ func prometheusInputConfig() *service.ConfigSpec {
4041
).Description("List of PromQL queries to execute."),
4142
service.NewStringField(fieldPrometheusSchedule).
4243
Description("The cron expression to use for the scrape job.").
43-
Examples("*/30 * * * * *", "@every 30s").
44-
Default("*/30 * * * * *"),
45-
).Example("Basic Configuration", "Collect Prometheus metrics with a scrape interval of 30 seconds.", `
44+
Examples("0 * * * * *", "@every 1m").
45+
Default("0 * * * * *"),
46+
service.NewDurationField(fieldPrometheusQueryOffset).
47+
Description("Indicates how far back in time the scraping should be done to account for delays in metric availability.").
48+
Default("0s"),
49+
).Example("Basic Configuration", "Collect Prometheus metrics with a scrape interval of 1 minute and a scrape offset of 30 seconds to account for delays in metric availability.", `
4650
input:
4751
prometheus:
4852
url: "${PROMETHEUS_URL:http://localhost:9090}"
49-
schedule: "* * * * *"
53+
schedule: "0 * * * * *"
54+
query_offset: "1m"
5055
queries:
5156
- query:
5257
name: "node_cpu_usage"
53-
promql: "sum(rate(node_cpu_seconds_total{mode!='idle'}[1m])) by (instance)"
58+
promql: "sum(increase(node_cpu_seconds_total{mode!='idle'}[1m])) by (instance)"
5459
`)
5560
}
5661

@@ -78,14 +83,15 @@ type QueryResult struct {
7883
var _ service.BatchInput = (*prometheusInput)(nil)
7984

8085
type prometheusInput struct {
81-
logger *service.Logger
82-
client v1.API
83-
queries []PromQuery
84-
interval time.Duration
85-
schedule string
86-
scheduler gocron.Scheduler
87-
store map[time.Time][]QueryResult
88-
mu sync.Mutex
86+
logger *service.Logger
87+
client v1.API
88+
queries []PromQuery
89+
interval time.Duration
90+
schedule string
91+
queryOffset time.Duration
92+
scheduler gocron.Scheduler
93+
store map[time.Time][]QueryResult
94+
mu sync.Mutex
8995
}
9096

9197
func newPrometheusInput(conf *service.ParsedConfig, logger *service.Logger) (*prometheusInput, error) {
@@ -99,6 +105,11 @@ func newPrometheusInput(conf *service.ParsedConfig, logger *service.Logger) (*pr
99105
return nil, err
100106
}
101107

108+
queryOffset, err := conf.FieldDuration(fieldPrometheusQueryOffset)
109+
if err != nil {
110+
return nil, err
111+
}
112+
102113
// Parse queries
103114
queriesConf, err := conf.FieldObjectList(fieldPrometheusQueries)
104115
if err != nil {
@@ -160,28 +171,38 @@ func newPrometheusInput(conf *service.ParsedConfig, logger *service.Logger) (*pr
160171
}
161172

162173
return &prometheusInput{
163-
logger: logger,
164-
client: v1.NewAPI(client),
165-
queries: queries,
166-
interval: interval,
167-
schedule: schedule,
168-
scheduler: scheduler,
169-
store: make(map[time.Time][]QueryResult),
170-
mu: sync.Mutex{},
174+
logger: logger,
175+
client: v1.NewAPI(client),
176+
queries: queries,
177+
interval: interval,
178+
schedule: schedule,
179+
queryOffset: queryOffset,
180+
scheduler: scheduler,
181+
store: make(map[time.Time][]QueryResult),
182+
mu: sync.Mutex{},
171183
}, nil
172184
}
173185

174186
// scrape executes the PromQL queries and stores the results.
175187
func (in *prometheusInput) scrape(ctx context.Context, t time.Time) error {
176-
in.logger.Debugf("executing PromQL queries at %s", t.Format(time.RFC3339))
188+
// Convert time to UTC
189+
t = t.UTC()
190+
191+
// Apply the metrics scrape offset
192+
queryTime := t.Add(-in.queryOffset)
193+
194+
in.logger.Debugf("executing PromQL queries at %s (using query time %s with offset %s)",
195+
t.Format(time.RFC3339),
196+
queryTime.Format(time.RFC3339),
197+
in.queryOffset)
177198

178199
results := make([]QueryResult, 0, len(in.queries))
179200

180201
for _, query := range in.queries {
181202
in.logger.Tracef("executing query: %s", query.PromQL)
182203

183-
// Execute the PromQL query
184-
result, warnings, err := in.client.Query(ctx, query.PromQL, t)
204+
// Execute the PromQL query with the offset applied time
205+
result, warnings, err := in.client.Query(ctx, query.PromQL, queryTime)
185206
if err != nil {
186207
in.logger.Errorf("error executing query %s: %v", query.PromQL, err)
187208
return err
@@ -203,7 +224,7 @@ func (in *prometheusInput) scrape(ctx context.Context, t time.Time) error {
203224
results = append(results, QueryResult{
204225
Name: query.Name,
205226
Query: query.PromQL,
206-
Timestamp: t,
227+
Timestamp: queryTime,
207228
Values: vector,
208229
})
209230
}
@@ -266,6 +287,7 @@ func (in *prometheusInput) ReadBatch(ctx context.Context) (service.MessageBatch,
266287
msg := service.NewMessage(encoded)
267288
msg.MetaSet("scrape_time", t.Format(time.RFC3339))
268289
msg.MetaSet("scrape_interval", in.interval.String())
290+
msg.MetaSet("query_offset", in.queryOffset.String())
269291
msg.MetaSet("query_name", result.Name)
270292
batch = append(batch, msg)
271293
}

collector/benthos/input/run_ai.go

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ const (
2323
fieldResourceType = "resource_type"
2424
fieldMetrics = "metrics"
2525
fieldSchedule = "schedule"
26-
fieldMetricsScrapeOffset = "metrics_scrape_offset"
26+
fieldMetricsOffset = "metrics_offset"
2727
fieldHTTPConfig = "http"
2828
fieldHTTPTimeout = "timeout"
2929
fieldHTTPRetryCount = "retry_count"
@@ -67,7 +67,7 @@ func runAIInputConfig() *service.ConfigSpec {
6767
Description("The cron expression to use for the scrape job.").
6868
Examples("*/30 * * * * *", "@every 30s").
6969
Default("*/30 * * * * *"),
70-
service.NewDurationField(fieldMetricsScrapeOffset).
70+
service.NewDurationField(fieldMetricsOffset).
7171
Description("Indicates how far back in time the scraping window should start to account for delays in metric availability.").
7272
Default("0s"),
7373
service.NewObjectField(fieldHTTPConfig,
@@ -126,16 +126,16 @@ func init() {
126126
var _ service.BatchInput = (*runAIInput)(nil)
127127

128128
type runAIInput struct {
129-
logger *service.Logger
130-
service *runai.Service
131-
resourceType string
132-
metrics []runai.MetricType
133-
interval time.Duration
134-
schedule string
135-
metricsScrapeOffset time.Duration
136-
scheduler gocron.Scheduler
137-
store map[time.Time][]runai.ResourceWithMetrics
138-
mu sync.Mutex
129+
logger *service.Logger
130+
service *runai.Service
131+
resourceType string
132+
metrics []runai.MetricType
133+
interval time.Duration
134+
schedule string
135+
metricsOffset time.Duration
136+
scheduler gocron.Scheduler
137+
store map[time.Time][]runai.ResourceWithMetrics
138+
mu sync.Mutex
139139
}
140140

141141
func newRunAIInput(conf *service.ParsedConfig, logger *service.Logger) (*runAIInput, error) {
@@ -169,7 +169,7 @@ func newRunAIInput(conf *service.ParsedConfig, logger *service.Logger) (*runAIIn
169169
return nil, err
170170
}
171171

172-
metricsScrapeOffset, err := conf.FieldDuration(fieldMetricsScrapeOffset)
172+
metricsOffset, err := conf.FieldDuration(fieldMetricsOffset)
173173
if err != nil {
174174
return nil, err
175175
}
@@ -183,8 +183,8 @@ func newRunAIInput(conf *service.ParsedConfig, logger *service.Logger) (*runAIIn
183183
return nil, err
184184
}
185185

186-
// Get current time
187-
now := time.Now()
186+
// Get current time in UTC
187+
now := time.Now().UTC()
188188

189189
// Get next two occurrences
190190
nextRun := cronSchedule.Next(now)
@@ -230,13 +230,13 @@ func newRunAIInput(conf *service.ParsedConfig, logger *service.Logger) (*runAIIn
230230
}
231231

232232
return &runAIInput{
233-
logger: logger,
234-
service: service,
235-
resourceType: resourceType,
236-
interval: interval,
237-
schedule: schedule,
238-
metricsScrapeOffset: metricsScrapeOffset,
239-
scheduler: scheduler,
233+
logger: logger,
234+
service: service,
235+
resourceType: resourceType,
236+
interval: interval,
237+
schedule: schedule,
238+
metricsOffset: metricsOffset,
239+
scheduler: scheduler,
240240
metrics: lo.Map(metrics, func(metric string, _ int) runai.MetricType {
241241
return runai.MetricType(metric)
242242
}),
@@ -249,12 +249,15 @@ func newRunAIInput(conf *service.ParsedConfig, logger *service.Logger) (*runAIIn
249249
func (in *runAIInput) scrape(ctx context.Context, t time.Time) error {
250250
in.logger.Debugf("scraping %s metrics between %s and %s", in.resourceType, t.Add(-in.interval).Format(time.RFC3339), t.Format(time.RFC3339))
251251

252+
startTime := t.Add(-in.interval).Add(-in.metricsOffset)
253+
endTime := t.Add(-in.metricsOffset)
254+
252255
switch in.resourceType {
253256
case "workload":
254257
workloadsWithMetrics, err := in.service.GetAllWorkloadWithMetrics(ctx, runai.MeasurementParams{
255258
MetricType: in.metrics,
256-
StartTime: t.Add(-in.interval).Add(-in.metricsScrapeOffset),
257-
EndTime: t.Add(-in.metricsScrapeOffset),
259+
StartTime: startTime,
260+
EndTime: endTime,
258261
})
259262
if err != nil {
260263
return err
@@ -268,8 +271,8 @@ func (in *runAIInput) scrape(ctx context.Context, t time.Time) error {
268271
case "pod":
269272
podsWithMetrics, err := in.service.GetAllPodWithMetrics(ctx, runai.MeasurementParams{
270273
MetricType: in.metrics,
271-
StartTime: t.Add(-in.interval).Add(-in.metricsScrapeOffset),
272-
EndTime: t.Add(-in.metricsScrapeOffset),
274+
StartTime: startTime,
275+
EndTime: endTime,
273276
})
274277
if err != nil {
275278
return err
@@ -291,7 +294,8 @@ func (in *runAIInput) Connect(ctx context.Context) error {
291294
gocron.CronJob(in.schedule, true),
292295
gocron.NewTask(
293296
func(ctx context.Context) error {
294-
t := time.Now()
297+
// Get current time in UTC
298+
t := time.Now().UTC()
295299
err := in.scrape(ctx, t)
296300
if err != nil {
297301
in.logger.Errorf("error scraping metrics: %v", err)
@@ -336,6 +340,9 @@ func (in *runAIInput) ReadBatch(ctx context.Context) (service.MessageBatch, serv
336340
msg := service.NewMessage(encoded)
337341
msg.MetaSet("scrape_time", t.Format(time.RFC3339))
338342
msg.MetaSet("scrape_interval", in.interval.String())
343+
msg.MetaSet("metrics_offset", in.metricsOffset.String())
344+
msg.MetaSet("resource_type", in.resourceType)
345+
msg.MetaSet("metrics_time", resourceWithMetrics.GetMetrics().Timestamp.Format(time.RFC3339))
339346
batch = append(batch, msg)
340347
}
341348

collector/benthos/input/runai/metrics.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func (s *Service) GetWorkloadMetrics(ctx context.Context, workloadID string, par
103103

104104
for _, measurement := range result.Measurements {
105105
if len(measurement.Values) > 0 {
106-
v, err := strconv.ParseFloat(measurement.Values[0].Value, 64)
106+
v, err := strconv.ParseFloat(measurement.Values[len(measurement.Values)-1].Value, 64)
107107
if err != nil {
108108
return m, fmt.Errorf("failed to parse metric value: %w", err)
109109
}
@@ -152,22 +152,22 @@ func (s *Service) GetAllWorkloadWithMetrics(ctx context.Context, params Measurem
152152
workloadsWithMetrics := make([]WorkloadWithMetrics, len(workloads))
153153
for i, workload := range workloads {
154154
metrics := Metrics{
155-
Timestamp: params.StartTime,
155+
Timestamp: params.StartTime.UTC(),
156156
Values: make(map[MetricType]float64),
157157
}
158158

159159
// We chunk the metric types to not exceed the max number of metrics per request
160160
for _, metricTypes := range lo.Chunk(params.MetricType, 9) {
161161
m, err := s.GetWorkloadMetrics(ctx, workload.ID, MeasurementParams{
162162
MetricType: metricTypes,
163-
StartTime: params.StartTime,
164-
EndTime: params.EndTime,
163+
StartTime: params.StartTime.UTC(),
164+
EndTime: params.EndTime.UTC(),
165165
})
166166
if err != nil {
167167
return nil, err
168168
}
169169

170-
metrics.Timestamp = m.Timestamp
170+
metrics.Timestamp = m.Timestamp.UTC()
171171
for mt, v := range m.Values {
172172
metrics.Values[mt] = v
173173
}
@@ -240,7 +240,7 @@ func (s *Service) GetPodMetrics(ctx context.Context, workloadID string, podID st
240240

241241
for _, measurement := range result.Measurements {
242242
if len(measurement.Values) > 0 {
243-
v, err := strconv.ParseFloat(measurement.Values[0].Value, 64)
243+
v, err := strconv.ParseFloat(measurement.Values[len(measurement.Values)-1].Value, 64)
244244
if err != nil {
245245
return m, fmt.Errorf("failed to parse metric value: %w", err)
246246
}
@@ -289,22 +289,22 @@ func (s *Service) GetAllPodWithMetrics(ctx context.Context, params MeasurementPa
289289
podsWithMetrics := make([]PodWithMetrics, len(pods))
290290
for i, pod := range pods {
291291
metrics := Metrics{
292-
Timestamp: params.StartTime,
292+
Timestamp: params.StartTime.UTC(),
293293
Values: make(map[MetricType]float64),
294294
}
295295

296296
// We chunk the metric types to not exceed the max number of metrics per request
297297
for _, metricTypes := range lo.Chunk(params.MetricType, 9) {
298298
m, err := s.GetPodMetrics(ctx, pod.WorkloadID, pod.ID, MeasurementParams{
299299
MetricType: metricTypes,
300-
StartTime: params.StartTime,
301-
EndTime: params.EndTime,
300+
StartTime: params.StartTime.UTC(),
301+
EndTime: params.EndTime.UTC(),
302302
})
303303
if err != nil {
304304
return nil, err
305305
}
306306

307-
metrics.Timestamp = m.Timestamp
307+
metrics.Timestamp = m.Timestamp.UTC()
308308
for mt, v := range m.Values {
309309
metrics.Values[mt] = v
310310
}

collector/benthos/input/schedule.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func (in *scheduleInput) ReadBatch(ctx context.Context) (service.MessageBatch, s
6666

6767
select {
6868
case v := <-in.timer.C:
69-
t = v
69+
t = v.UTC()
7070
default:
7171
return nil, func(context.Context, error) error { return nil }, nil
7272
}

0 commit comments

Comments
 (0)