Skip to content

PMM-13830 AWS SDK v2. #104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 23 additions & 35 deletions basic/scraper.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package basic

import (
"context"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"

Expand All @@ -25,17 +27,16 @@ type Scraper struct {
ch chan<- prometheus.Metric

// internal
svc *cloudwatch.CloudWatch
svc *cloudwatch.Client
constLabels prometheus.Labels
}

func NewScraper(instance *config.Instance, collector *Collector, ch chan<- prometheus.Metric) *Scraper {
// Create CloudWatch client
sess, _ := collector.sessions.GetSession(instance.Region, instance.Instance)
if sess == nil {
cfg, _ := collector.sessions.GetConfig(instance.Region, instance.Instance)
if cfg == nil {
return nil
}
svc := cloudwatch.New(sess)
svc := cloudwatch.NewFromConfig(*cfg)

constLabels := prometheus.Labels{
"region": instance.Region,
Expand All @@ -61,15 +62,13 @@ func NewScraper(instance *config.Instance, collector *Collector, ch chan<- prome
}
}

func getLatestDatapoint(datapoints []*cloudwatch.Datapoint) *cloudwatch.Datapoint {
var latest *cloudwatch.Datapoint = nil

for dp := range datapoints {
if latest == nil || latest.Timestamp.Before(*datapoints[dp].Timestamp) {
latest = datapoints[dp]
func getLatestDatapoint(datapoints []types.Datapoint) *types.Datapoint {
var latest *types.Datapoint = nil
for i := range datapoints {
if latest == nil || latest.Timestamp.Before(*datapoints[i].Timestamp) {
latest = &datapoints[i]
}
}

return latest
}

Expand Down Expand Up @@ -97,45 +96,34 @@ func (s *Scraper) scrapeMetric(metric Metric) error {
end := now.Add(-Delay)

params := &cloudwatch.GetMetricStatisticsInput{
EndTime: aws.Time(end),
StartTime: aws.Time(end.Add(-Range)),

Period: aws.Int64(int64(Period.Seconds())),
EndTime: aws.Time(end),
StartTime: aws.Time(end.Add(-Range)),
Period: aws.Int32(int32(Period.Seconds())),
MetricName: aws.String(metric.cwName),
Namespace: aws.String("AWS/RDS"),
Dimensions: []*cloudwatch.Dimension{},
Statistics: aws.StringSlice([]string{"Average"}),
Unit: nil,
Dimensions: []types.Dimension{{
Name: aws.String("DBInstanceIdentifier"),
Value: aws.String(s.instance.Instance),
}},
Statistics: []types.Statistic{types.StatisticAverage},
}

params.Dimensions = append(params.Dimensions, &cloudwatch.Dimension{
Name: aws.String("DBInstanceIdentifier"),
Value: aws.String(s.instance.Instance),
})

// Call CloudWatch to gather the datapoints
resp, err := s.svc.GetMetricStatistics(params)
resp, err := s.svc.GetMetricStatistics(context.Background(), params)
if err != nil {
return err
}

// There's nothing in there, don't publish the metric
if len(resp.Datapoints) == 0 {
return nil
}

// Pick the latest datapoint
dp := getLatestDatapoint(resp.Datapoints)

// Get the metric.
v := aws.Float64Value(dp.Average)
v := aws.ToFloat64(dp.Average)
switch metric.cwName {
case "EngineUptime":
// "Fake EngineUptime -> node_boot_time with time.Now().Unix() - EngineUptime."
v = float64(time.Now().Unix() - int64(v))
}

// Send metric.
s.ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(metric.prometheusName, metric.prometheusHelp, nil, s.constLabels),
prometheus.GaugeValue,
Expand Down
128 changes: 67 additions & 61 deletions basic/testdata/all.txt

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion enhanced/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ func NewCollector(sessions *sessions.Sessions, logger log.Logger) *Collector {

for session, instances := range sessions.AllSessions() {
enabledInstances := getEnabledInstances(instances)
s := newScraper(session, enabledInstances, logger)
cfg := sessions.Configs[session]
s := newScraper(cfg, enabledInstances, logger)

interval := maxInterval
for _, instance := range enabledInstances {
Expand Down
48 changes: 22 additions & 26 deletions enhanced/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import (
"fmt"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -19,14 +18,14 @@ import (
type scraper struct {
instances []sessions.Instance
logStreamNames []string
svc *cloudwatchlogs.CloudWatchLogs
svc *cloudwatchlogs.Client
nextStartTime time.Time
logger log.Logger

testDisallowUnknownFields bool // for tests only
}

func newScraper(session *session.Session, instances []sessions.Instance, logger log.Logger) *scraper {
func newScraper(cfg aws.Config, instances []sessions.Instance, logger log.Logger) *scraper {
logStreamNames := make([]string, 0, len(instances))
for _, instance := range instances {
logStreamNames = append(logStreamNames, instance.ResourceID)
Expand All @@ -35,7 +34,7 @@ func newScraper(session *session.Session, instances []sessions.Instance, logger
return &scraper{
instances: instances,
logStreamNames: logStreamNames,
svc: cloudwatchlogs.New(session),
svc: cloudwatchlogs.NewFromConfig(cfg),
nextStartTime: time.Now().Add(-3 * time.Minute).Round(0), // strip monotonic clock reading
logger: log.With(logger, "component", "enhanced"),
}
Expand Down Expand Up @@ -63,7 +62,6 @@ func (s *scraper) start(ctx context.Context, interval time.Duration, ch chan<- m

// scrape performs a single scrape.
func (s *scraper) scrape(ctx context.Context) (map[string][]prometheus.Metric, map[string]string) {

allMetrics := make(map[string]map[time.Time][]prometheus.Metric) // ResourceID -> event timestamp -> metrics
allMessages := make(map[string]map[time.Time]string) // ResourceID -> event timestamp -> message

Expand All @@ -79,24 +77,29 @@ func (s *scraper) scrape(ctx context.Context) (map[string][]prometheus.Metric, m

input := &cloudwatchlogs.FilterLogEventsInput{
LogGroupName: aws.String("RDSOSMetrics"),
LogStreamNames: aws.StringSlice(s.logStreamNames[sliceStart:sliceEnd]),
StartTime: aws.Int64(aws.TimeUnixMilli(s.nextStartTime)),
LogStreamNames: s.logStreamNames[sliceStart:sliceEnd],
StartTime: aws.Int64(s.nextStartTime.UnixMilli()),
}

level.Debug(log.With(s.logger, "next_start", s.nextStartTime.UTC(), "since_last", time.Since(s.nextStartTime))).Log("msg", "Requesting metrics")

// collect all returned events and metrics/messages
collectAllMetrics := func(output *cloudwatchlogs.FilterLogEventsOutput, lastPage bool) bool {
paginator := cloudwatchlogs.NewFilterLogEventsPaginator(s.svc, input)
for paginator.HasMorePages() {
output, err := paginator.NextPage(ctx)
if err != nil {
level.Error(s.logger).Log("msg", "Failed to filter log events.", "error", err)
break
}
for _, event := range output.Events {
l := log.With(s.logger,
"EventId", *event.EventId,
"LogStreamName", *event.LogStreamName,
"Timestamp", aws.MillisecondsTimeValue(event.Timestamp).UTC(),
"IngestionTime", aws.MillisecondsTimeValue(event.IngestionTime).UTC())
"EventId", aws.ToString(event.EventId),
"LogStreamName", aws.ToString(event.LogStreamName),
"Timestamp", time.UnixMilli(aws.ToInt64(event.Timestamp)).UTC(),
"IngestionTime", time.UnixMilli(aws.ToInt64(event.IngestionTime)).UTC())

var instance *sessions.Instance
for _, i := range s.instances {
if i.ResourceID == *event.LogStreamName {
if i.ResourceID == aws.ToString(event.LogStreamName) {
instance = &i
break
}
Expand All @@ -112,8 +115,7 @@ func (s *scraper) scrape(ctx context.Context) (map[string][]prometheus.Metric, m
}
l = log.With(l, "region", instance.Region, "instance", instance.Instance)

// l.Debugf("Message:\n%s", *event.Message)
osMetrics, err := parseOSMetrics([]byte(*event.Message), s.testDisallowUnknownFields)
osMetrics, err := parseOSMetrics([]byte(aws.ToString(event.Message)), s.testDisallowUnknownFields)
if err != nil {
// only for tests
if s.testDisallowUnknownFields {
Expand All @@ -123,9 +125,8 @@ func (s *scraper) scrape(ctx context.Context) (map[string][]prometheus.Metric, m
level.Error(l).Log("msg", "Failed to parse metrics.", "error", err)
continue
}
// l.Debugf("OS Metrics:\n%#v", osMetrics)

timestamp := aws.MillisecondsTimeValue(event.Timestamp).UTC()
timestamp := time.UnixMilli(aws.ToInt64(event.Timestamp)).UTC()
level.Debug(l).Log("msg", fmt.Sprintf("Timestamp from message: %s; from event: %s.", osMetrics.Timestamp.UTC(), timestamp))

if allMetrics[instance.ResourceID] == nil {
Expand All @@ -136,13 +137,8 @@ func (s *scraper) scrape(ctx context.Context) (map[string][]prometheus.Metric, m
if allMessages[instance.ResourceID] == nil {
allMessages[instance.ResourceID] = make(map[time.Time]string)
}
allMessages[instance.ResourceID][timestamp] = *event.Message
allMessages[instance.ResourceID][timestamp] = aws.ToString(event.Message)
}

return true // continue pagination
}
if err := s.svc.FilterLogEventsPagesWithContext(ctx, input, collectAllMetrics); err != nil {
level.Error(s.logger).Log("msg", "Failed to filter log events.", "error", err)
}
}
// get better times
Expand Down
6 changes: 3 additions & 3 deletions enhanced/scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func TestScraper(t *testing.T) {
for session, instances := range sess.AllSessions() {
session, instances := session, instances
t.Run(fmt.Sprint(instances), func(t *testing.T) {
// test that there are no new metrics
s := newScraper(session, instances, logger)
cfg := sess.Configs[session]
s := newScraper(cfg, instances, logger)
s.testDisallowUnknownFields = true
metrics, messages := s.scrape(context.Background())
require.Len(t, metrics, len(instances))
Expand Down Expand Up @@ -165,7 +165,7 @@ func TestScraperDisableEnhancedMetrics(t *testing.T) {
for session, instances := range sess.AllSessions() {
session, instances := session, instances
t.Run(fmt.Sprint(instances), func(t *testing.T) {
s := newScraper(session, instances, logger)
s := newScraper(sess.Configs[session], instances, logger)
s.testDisallowUnknownFields = true
metrics, _ := s.scrape(context.Background())

Expand Down
Loading