Skip to content

Commit 42ed986

Browse files
committed
feat(collector): add metrics and query offset to inputs
1 parent 5179766 commit 42ed986

File tree

2 files changed

+73
-48
lines changed

2 files changed

+73
-48
lines changed

collector/benthos/input/prometheus.go

Lines changed: 43 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@ import (
1515
)
1616

1717
const (
18-
fieldPrometheusURL = "url"
19-
fieldPrometheusQueries = "queries"
20-
fieldPrometheusSchedule = "schedule"
18+
fieldPrometheusURL = "url"
19+
fieldPrometheusQueries = "queries"
20+
fieldPrometheusSchedule = "schedule"
21+
fieldPrometheusQueryOffset = "query_offset"
2122
)
2223

2324
func prometheusInputConfig() *service.ConfigSpec {
@@ -42,11 +43,15 @@ func prometheusInputConfig() *service.ConfigSpec {
4243
Description("The cron expression to use for the scrape job.").
4344
Examples("0 * * * * *", "@every 1m").
4445
Default("0 * * * * *"),
45-
).Example("Basic Configuration", "Collect Prometheus metrics with a scrape interval of 1 minute.", `
46+
service.NewDurationField(fieldPrometheusQueryOffset).
47+
Description("Indicates how far back in time the scraping should be done to account for delays in metric availability.").
48+
Default("0s"),
49+
).Example("Basic Configuration", "Collect Prometheus metrics with a scrape interval of 1 minute and a scrape offset of 30 seconds to account for delays in metric availability.", `
4650
input:
4751
prometheus:
4852
url: "${PROMETHEUS_URL:http://localhost:9090}"
4953
schedule: "0 * * * * *"
54+
query_offset: "1m"
5055
queries:
5156
- query:
5257
name: "node_cpu_usage"
@@ -78,14 +83,15 @@ type QueryResult struct {
7883
var _ service.BatchInput = (*prometheusInput)(nil)
7984

8085
type prometheusInput struct {
81-
logger *service.Logger
82-
client v1.API
83-
queries []PromQuery
84-
interval time.Duration
85-
schedule string
86-
scheduler gocron.Scheduler
87-
store map[time.Time][]QueryResult
88-
mu sync.Mutex
86+
logger *service.Logger
87+
client v1.API
88+
queries []PromQuery
89+
interval time.Duration
90+
schedule string
91+
queryOffset time.Duration
92+
scheduler gocron.Scheduler
93+
store map[time.Time][]QueryResult
94+
mu sync.Mutex
8995
}
9096

9197
func newPrometheusInput(conf *service.ParsedConfig, logger *service.Logger) (*prometheusInput, error) {
@@ -99,6 +105,11 @@ func newPrometheusInput(conf *service.ParsedConfig, logger *service.Logger) (*pr
99105
return nil, err
100106
}
101107

108+
queryOffset, err := conf.FieldDuration(fieldPrometheusQueryOffset)
109+
if err != nil {
110+
return nil, err
111+
}
112+
102113
// Parse queries
103114
queriesConf, err := conf.FieldObjectList(fieldPrometheusQueries)
104115
if err != nil {
@@ -160,14 +171,15 @@ func newPrometheusInput(conf *service.ParsedConfig, logger *service.Logger) (*pr
160171
}
161172

162173
return &prometheusInput{
163-
logger: logger,
164-
client: v1.NewAPI(client),
165-
queries: queries,
166-
interval: interval,
167-
schedule: schedule,
168-
scheduler: scheduler,
169-
store: make(map[time.Time][]QueryResult),
170-
mu: sync.Mutex{},
174+
logger: logger,
175+
client: v1.NewAPI(client),
176+
queries: queries,
177+
interval: interval,
178+
schedule: schedule,
179+
queryOffset: queryOffset,
180+
scheduler: scheduler,
181+
store: make(map[time.Time][]QueryResult),
182+
mu: sync.Mutex{},
171183
}, nil
172184
}
173185

@@ -176,15 +188,21 @@ func (in *prometheusInput) scrape(ctx context.Context, t time.Time) error {
176188
// Convert time to UTC
177189
t = t.UTC()
178190

179-
in.logger.Debugf("executing PromQL queries at %s", t.Format(time.RFC3339))
191+
// Apply the metrics scrape offset
192+
queryTime := t.Add(-in.queryOffset)
193+
194+
in.logger.Debugf("executing PromQL queries at %s (using query time %s with offset %s)",
195+
t.Format(time.RFC3339),
196+
queryTime.Format(time.RFC3339),
197+
in.queryOffset)
180198

181199
results := make([]QueryResult, 0, len(in.queries))
182200

183201
for _, query := range in.queries {
184202
in.logger.Tracef("executing query: %s", query.PromQL)
185203

186-
// Execute the PromQL query
187-
result, warnings, err := in.client.Query(ctx, query.PromQL, t)
204+
// Execute the PromQL query with the offset applied time
205+
result, warnings, err := in.client.Query(ctx, query.PromQL, queryTime)
188206
if err != nil {
189207
in.logger.Errorf("error executing query %s: %v", query.PromQL, err)
190208
return err
@@ -206,7 +224,7 @@ func (in *prometheusInput) scrape(ctx context.Context, t time.Time) error {
206224
results = append(results, QueryResult{
207225
Name: query.Name,
208226
Query: query.PromQL,
209-
Timestamp: t,
227+
Timestamp: queryTime,
210228
Values: vector,
211229
})
212230
}
@@ -269,6 +287,7 @@ func (in *prometheusInput) ReadBatch(ctx context.Context) (service.MessageBatch,
269287
msg := service.NewMessage(encoded)
270288
msg.MetaSet("scrape_time", t.Format(time.RFC3339))
271289
msg.MetaSet("scrape_interval", in.interval.String())
290+
msg.MetaSet("query_offset", in.queryOffset.String())
272291
msg.MetaSet("query_name", result.Name)
273292
batch = append(batch, msg)
274293
}

collector/benthos/input/run_ai.go

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ const (
2323
fieldResourceType = "resource_type"
2424
fieldMetrics = "metrics"
2525
fieldSchedule = "schedule"
26-
fieldMetricsScrapeOffset = "metrics_scrape_offset"
26+
fieldMetricsOffset = "metrics_offset"
2727
fieldHTTPConfig = "http"
2828
fieldHTTPTimeout = "timeout"
2929
fieldHTTPRetryCount = "retry_count"
@@ -67,7 +67,7 @@ func runAIInputConfig() *service.ConfigSpec {
6767
Description("The cron expression to use for the scrape job.").
6868
Examples("*/30 * * * * *", "@every 30s").
6969
Default("*/30 * * * * *"),
70-
service.NewDurationField(fieldMetricsScrapeOffset).
70+
service.NewDurationField(fieldMetricsOffset).
7171
Description("Indicates how far back in time the scraping window should start to account for delays in metric availability.").
7272
Default("0s"),
7373
service.NewObjectField(fieldHTTPConfig,
@@ -126,16 +126,16 @@ func init() {
126126
var _ service.BatchInput = (*runAIInput)(nil)
127127

128128
type runAIInput struct {
129-
logger *service.Logger
130-
service *runai.Service
131-
resourceType string
132-
metrics []runai.MetricType
133-
interval time.Duration
134-
schedule string
135-
metricsScrapeOffset time.Duration
136-
scheduler gocron.Scheduler
137-
store map[time.Time][]runai.ResourceWithMetrics
138-
mu sync.Mutex
129+
logger *service.Logger
130+
service *runai.Service
131+
resourceType string
132+
metrics []runai.MetricType
133+
interval time.Duration
134+
schedule string
135+
metricsOffset time.Duration
136+
scheduler gocron.Scheduler
137+
store map[time.Time][]runai.ResourceWithMetrics
138+
mu sync.Mutex
139139
}
140140

141141
func newRunAIInput(conf *service.ParsedConfig, logger *service.Logger) (*runAIInput, error) {
@@ -169,7 +169,7 @@ func newRunAIInput(conf *service.ParsedConfig, logger *service.Logger) (*runAIIn
169169
return nil, err
170170
}
171171

172-
metricsScrapeOffset, err := conf.FieldDuration(fieldMetricsScrapeOffset)
172+
metricsOffset, err := conf.FieldDuration(fieldMetricsOffset)
173173
if err != nil {
174174
return nil, err
175175
}
@@ -230,13 +230,13 @@ func newRunAIInput(conf *service.ParsedConfig, logger *service.Logger) (*runAIIn
230230
}
231231

232232
return &runAIInput{
233-
logger: logger,
234-
service: service,
235-
resourceType: resourceType,
236-
interval: interval,
237-
schedule: schedule,
238-
metricsScrapeOffset: metricsScrapeOffset,
239-
scheduler: scheduler,
233+
logger: logger,
234+
service: service,
235+
resourceType: resourceType,
236+
interval: interval,
237+
schedule: schedule,
238+
metricsOffset: metricsOffset,
239+
scheduler: scheduler,
240240
metrics: lo.Map(metrics, func(metric string, _ int) runai.MetricType {
241241
return runai.MetricType(metric)
242242
}),
@@ -249,12 +249,15 @@ func newRunAIInput(conf *service.ParsedConfig, logger *service.Logger) (*runAIIn
249249
func (in *runAIInput) scrape(ctx context.Context, t time.Time) error {
250250
in.logger.Debugf("scraping %s metrics between %s and %s", in.resourceType, t.Add(-in.interval).Format(time.RFC3339), t.Format(time.RFC3339))
251251

252+
startTime := t.Add(-in.interval).Add(-in.metricsOffset)
253+
endTime := t.Add(-in.metricsOffset)
254+
252255
switch in.resourceType {
253256
case "workload":
254257
workloadsWithMetrics, err := in.service.GetAllWorkloadWithMetrics(ctx, runai.MeasurementParams{
255258
MetricType: in.metrics,
256-
StartTime: t.Add(-in.interval).Add(-in.metricsScrapeOffset),
257-
EndTime: t.Add(-in.metricsScrapeOffset),
259+
StartTime: startTime,
260+
EndTime: endTime,
258261
})
259262
if err != nil {
260263
return err
@@ -268,8 +271,8 @@ func (in *runAIInput) scrape(ctx context.Context, t time.Time) error {
268271
case "pod":
269272
podsWithMetrics, err := in.service.GetAllPodWithMetrics(ctx, runai.MeasurementParams{
270273
MetricType: in.metrics,
271-
StartTime: t.Add(-in.interval).Add(-in.metricsScrapeOffset),
272-
EndTime: t.Add(-in.metricsScrapeOffset),
274+
StartTime: startTime,
275+
EndTime: endTime,
273276
})
274277
if err != nil {
275278
return err
@@ -337,6 +340,9 @@ func (in *runAIInput) ReadBatch(ctx context.Context) (service.MessageBatch, serv
337340
msg := service.NewMessage(encoded)
338341
msg.MetaSet("scrape_time", t.Format(time.RFC3339))
339342
msg.MetaSet("scrape_interval", in.interval.String())
343+
msg.MetaSet("metrics_offset", in.metricsOffset.String())
344+
msg.MetaSet("resource_type", in.resourceType)
345+
msg.MetaSet("metrics_time", resourceWithMetrics.GetMetrics().Timestamp.Format(time.RFC3339))
340346
batch = append(batch, msg)
341347
}
342348

0 commit comments

Comments
 (0)