Skip to content

Commit 5179766

Browse files
committed
feat(collector): always use UTC timestamps
1 parent 59e49c2 commit 5179766

File tree

4 files changed

+21
-17
lines changed

4 files changed

+21
-17
lines changed

collector/benthos/input/prometheus.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,17 @@ func prometheusInputConfig() *service.ConfigSpec {
4040
).Description("List of PromQL queries to execute."),
4141
service.NewStringField(fieldPrometheusSchedule).
4242
Description("The cron expression to use for the scrape job.").
43-
Examples("*/30 * * * * *", "@every 30s").
44-
Default("*/30 * * * * *"),
45-
).Example("Basic Configuration", "Collect Prometheus metrics with a scrape interval of 30 seconds.", `
43+
Examples("0 * * * * *", "@every 1m").
44+
Default("0 * * * * *"),
45+
).Example("Basic Configuration", "Collect Prometheus metrics with a scrape interval of 1 minute.", `
4646
input:
4747
prometheus:
4848
url: "${PROMETHEUS_URL:http://localhost:9090}"
49-
schedule: "* * * * *"
49+
schedule: "0 * * * * *"
5050
queries:
5151
- query:
5252
name: "node_cpu_usage"
53-
promql: "sum(rate(node_cpu_seconds_total{mode!='idle'}[1m])) by (instance)"
53+
promql: "sum(increase(node_cpu_seconds_total{mode!='idle'}[1m])) by (instance)"
5454
`)
5555
}
5656

@@ -173,6 +173,9 @@ func newPrometheusInput(conf *service.ParsedConfig, logger *service.Logger) (*pr
173173

174174
// scrape executes the PromQL queries and stores the results.
175175
func (in *prometheusInput) scrape(ctx context.Context, t time.Time) error {
176+
// Convert time to UTC
177+
t = t.UTC()
178+
176179
in.logger.Debugf("executing PromQL queries at %s", t.Format(time.RFC3339))
177180

178181
results := make([]QueryResult, 0, len(in.queries))

collector/benthos/input/run_ai.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,8 @@ func newRunAIInput(conf *service.ParsedConfig, logger *service.Logger) (*runAIIn
183183
return nil, err
184184
}
185185

186-
// Get current time
187-
now := time.Now()
186+
// Get current time in UTC
187+
now := time.Now().UTC()
188188

189189
// Get next two occurrences
190190
nextRun := cronSchedule.Next(now)
@@ -291,7 +291,8 @@ func (in *runAIInput) Connect(ctx context.Context) error {
291291
gocron.CronJob(in.schedule, true),
292292
gocron.NewTask(
293293
func(ctx context.Context) error {
294-
t := time.Now()
294+
// Get current time in UTC
295+
t := time.Now().UTC()
295296
err := in.scrape(ctx, t)
296297
if err != nil {
297298
in.logger.Errorf("error scraping metrics: %v", err)

collector/benthos/input/runai/metrics.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -152,22 +152,22 @@ func (s *Service) GetAllWorkloadWithMetrics(ctx context.Context, params Measurem
152152
workloadsWithMetrics := make([]WorkloadWithMetrics, len(workloads))
153153
for i, workload := range workloads {
154154
metrics := Metrics{
155-
Timestamp: params.StartTime,
155+
Timestamp: params.StartTime.UTC(),
156156
Values: make(map[MetricType]float64),
157157
}
158158

159159
// We chunk the metric types to not exceed the max number of metrics per request
160160
for _, metricTypes := range lo.Chunk(params.MetricType, 9) {
161161
m, err := s.GetWorkloadMetrics(ctx, workload.ID, MeasurementParams{
162162
MetricType: metricTypes,
163-
StartTime: params.StartTime,
164-
EndTime: params.EndTime,
163+
StartTime: params.StartTime.UTC(),
164+
EndTime: params.EndTime.UTC(),
165165
})
166166
if err != nil {
167167
return nil, err
168168
}
169169

170-
metrics.Timestamp = m.Timestamp
170+
metrics.Timestamp = m.Timestamp.UTC()
171171
for mt, v := range m.Values {
172172
metrics.Values[mt] = v
173173
}
@@ -289,22 +289,22 @@ func (s *Service) GetAllPodWithMetrics(ctx context.Context, params MeasurementPa
289289
podsWithMetrics := make([]PodWithMetrics, len(pods))
290290
for i, pod := range pods {
291291
metrics := Metrics{
292-
Timestamp: params.StartTime,
292+
Timestamp: params.StartTime.UTC(),
293293
Values: make(map[MetricType]float64),
294294
}
295295

296296
// We chunk the metric types to not exceed the max number of metrics per request
297297
for _, metricTypes := range lo.Chunk(params.MetricType, 9) {
298298
m, err := s.GetPodMetrics(ctx, pod.WorkloadID, pod.ID, MeasurementParams{
299299
MetricType: metricTypes,
300-
StartTime: params.StartTime,
301-
EndTime: params.EndTime,
300+
StartTime: params.StartTime.UTC(),
301+
EndTime: params.EndTime.UTC(),
302302
})
303303
if err != nil {
304304
return nil, err
305305
}
306306

307-
metrics.Timestamp = m.Timestamp
307+
metrics.Timestamp = m.Timestamp.UTC()
308308
for mt, v := range m.Values {
309309
metrics.Values[mt] = v
310310
}

collector/benthos/input/schedule.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func (in *scheduleInput) ReadBatch(ctx context.Context) (service.MessageBatch, s
6666

6767
select {
6868
case v := <-in.timer.C:
69-
t = v
69+
t = v.UTC()
7070
default:
7171
return nil, func(context.Context, error) error { return nil }, nil
7272
}

0 commit comments

Comments
 (0)