Skip to content

Commit 26c389a

Browse files
OTLP Ingestion (#5813)
* Initial OTLP ingest support Signed-off-by: Anthony J Mirabella <[email protected]> * Add resoure attribute conversion Signed-off-by: Anthony J Mirabella <[email protected]> * Fix lint Signed-off-by: Friedrich Gonzalez <[email protected]> * Put under /api/v1/otlp Signed-off-by: Friedrich Gonzalez <[email protected]> * Re-use DecodeOTLPWriteRequest Signed-off-by: Friedrich Gonzalez <[email protected]> * Tests Signed-off-by: Friedrich Gonzalez <[email protected]> * Integration test Signed-off-by: Friedrich Gonzalez <[email protected]> * Fix lint and minimal docs Signed-off-by: Friedrich Gonzalez <[email protected]> * Catch error Signed-off-by: Friedrich Gonzalez <[email protected]> --------- Signed-off-by: Anthony J Mirabella <[email protected]> Signed-off-by: Friedrich Gonzalez <[email protected]> Co-authored-by: Anthony J Mirabella <[email protected]>
1 parent a62d6ae commit 26c389a

File tree

9 files changed

+459
-1
lines changed

9 files changed

+459
-1
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
* [CHANGE] Querier: Mark `-querier.ingester-streaming` flag as deprecated. Now query ingester streaming is always enabled. #5817
1212
* [CHANGE] Compactor/Bucket Store: Added `-blocks-storage.bucket-store.block-discovery-strategy` to configure different block listing strategy. Reverted the current recursive block listing mechanism and use the strategy `Concurrent` as in 1.15. #5828
1313
* [CHANGE] Compactor: Don't halt compactor when overlapped source blocks detected. #5854
14+
* [FEATURE] OTLP ingestion experimental. #5813
1415
* [FEATURE] Ingester: Add per-tenant new metric `cortex_ingester_tsdb_data_replay_duration_seconds`. #5477
1516
* [FEATURE] Query Frontend/Scheduler: Add query priority support. #5605
1617
* [FEATURE] Tracing: Add `kuberesolver` to resolve endpoints address with `kubernetes://` prefix as Kubernetes service. #5731

docs/api/_index.md

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ For the sake of clarity, in this document we have grouped API endpoints by servi
2626
| [Pprof](#pprof) | _All services_ || `GET /debug/pprof` |
2727
| [Fgprof](#fgprof) | _All services_ || `GET /debug/fgprof` |
2828
| [Remote write](#remote-write) | Distributor || `POST /api/v1/push` |
29+
| [OTLP receiver](#otlp-receiver) | Distributor || `POST /api/v1/otlp/v1/metrics` |
2930
| [Tenants stats](#tenants-stats) | Distributor || `GET /distributor/all_user_stats` |
3031
| [HA tracker status](#ha-tracker-status) | Distributor || `GET /distributor/ha_tracker` |
3132
| [Flush blocks](#flush-blocks) | Ingester || `GET,POST /ingester/flush` |

docs/configuration/v1-guarantees.md

+1
Original file line numberDiff line numberDiff line change
@@ -112,3 +112,4 @@ Currently experimental features are:
112112
- `-ruler.ring.final-sleep` (duration) CLI flag
113113
- `store-gateway.sharding-ring.final-sleep` (duration) CLI flag
114114
- `alertmanager-sharding-ring.final-sleep` (duration) CLI flag
115+
- OTLP Receiver

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ require (
8181
github.com/cespare/xxhash/v2 v2.2.0
8282
github.com/google/go-cmp v0.6.0
8383
github.com/sercand/kuberesolver/v4 v4.0.0
84+
go.opentelemetry.io/collector/pdata v1.3.0
8485
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a
8586
google.golang.org/protobuf v1.33.0
8687
)
@@ -208,7 +209,6 @@ require (
208209
go.mongodb.org/mongo-driver v1.14.0 // indirect
209210
go.opencensus.io v0.24.0 // indirect
210211
go.opentelemetry.io/collector/featuregate v1.3.0 // indirect
211-
go.opentelemetry.io/collector/pdata v1.3.0 // indirect
212212
go.opentelemetry.io/collector/semconv v0.96.0 // indirect
213213
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
214214
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect

integration/e2ecortex/client.go

+75
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ import (
2727
"github.com/prometheus/prometheus/storage/remote"
2828
yaml "gopkg.in/yaml.v3"
2929

30+
"go.opentelemetry.io/collector/pdata/pcommon"
31+
"go.opentelemetry.io/collector/pdata/pmetric"
32+
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
33+
3034
"github.com/cortexproject/cortex/pkg/ruler"
3135
"github.com/cortexproject/cortex/pkg/util/backoff"
3236
)
@@ -142,6 +146,77 @@ func (c *Client) Push(timeseries []prompb.TimeSeries) (*http.Response, error) {
142146
return res, nil
143147
}
144148

149+
func getNameAndAttributes(ts prompb.TimeSeries) (string, map[string]any) {
150+
var metricName string
151+
attributes := make(map[string]any)
152+
for _, label := range ts.Labels {
153+
if label.Name == model.MetricNameLabel {
154+
metricName = label.Value
155+
} else {
156+
attributes[label.Name] = label.Value
157+
}
158+
}
159+
return metricName, attributes
160+
}
161+
162+
func createDatapointsGauge(newMetric pmetric.Metric, attributes map[string]any, samples []prompb.Sample) {
163+
newMetric.SetEmptyGauge()
164+
for _, sample := range samples {
165+
datapoint := newMetric.Gauge().DataPoints().AppendEmpty()
166+
datapoint.SetDoubleValue(sample.Value)
167+
datapoint.SetTimestamp(pcommon.Timestamp(sample.Timestamp * time.Millisecond.Nanoseconds()))
168+
err := datapoint.Attributes().FromRaw(attributes)
169+
if err != nil {
170+
panic(err)
171+
}
172+
}
173+
}
174+
175+
// Convert Timeseries to Metrics
176+
func convertTimeseriesToMetrics(timeseries []prompb.TimeSeries) pmetric.Metrics {
177+
metrics := pmetric.NewMetrics()
178+
for _, ts := range timeseries {
179+
metricName, attributes := getNameAndAttributes(ts)
180+
newMetric := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
181+
newMetric.SetName(metricName)
182+
//TODO Set description for new metric
183+
//TODO Set unit for new metric
184+
createDatapointsGauge(newMetric, attributes, ts.Samples)
185+
//TODO(friedrichg): Add support for histograms
186+
}
187+
return metrics
188+
}
189+
190+
// Push series to OTLP endpoint
191+
func (c *Client) OTLP(timeseries []prompb.TimeSeries) (*http.Response, error) {
192+
193+
data, err := pmetricotlp.NewExportRequestFromMetrics(convertTimeseriesToMetrics(timeseries)).MarshalProto()
194+
if err != nil {
195+
return nil, err
196+
}
197+
198+
// Create HTTP request
199+
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/v1/otlp/v1/metrics", c.distributorAddress), bytes.NewReader(data))
200+
if err != nil {
201+
return nil, err
202+
}
203+
204+
req.Header.Set("Content-Type", "application/x-protobuf")
205+
req.Header.Set("X-Scope-OrgID", c.orgID)
206+
207+
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
208+
defer cancel()
209+
210+
// Execute HTTP request
211+
res, err := c.httpClient.Do(req.WithContext(ctx))
212+
if err != nil {
213+
return nil, err
214+
}
215+
216+
defer res.Body.Close()
217+
return res, nil
218+
}
219+
145220
// Query runs an instant query.
146221
func (c *Client) Query(query string, ts time.Time) (model.Value, error) {
147222
value, _, err := c.querierClient.Query(context.Background(), query, ts)

integration/otlp_test.go

+76
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
//go:build requires_docker
2+
// +build requires_docker
3+
4+
package integration
5+
6+
import (
7+
"fmt"
8+
"testing"
9+
"time"
10+
11+
"github.com/prometheus/common/model"
12+
"github.com/prometheus/prometheus/prompb"
13+
"github.com/stretchr/testify/assert"
14+
"github.com/stretchr/testify/require"
15+
16+
"github.com/cortexproject/cortex/integration/e2e"
17+
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
18+
"github.com/cortexproject/cortex/integration/e2ecortex"
19+
)
20+
21+
func TestOTLP(t *testing.T) {
22+
s, err := e2e.NewScenario(networkName)
23+
require.NoError(t, err)
24+
defer s.Close()
25+
26+
// Start dependencies.
27+
minio := e2edb.NewMinio(9000, bucketName)
28+
require.NoError(t, s.StartAndWaitReady(minio))
29+
30+
// Start Cortex components.
31+
require.NoError(t, copyFileToSharedDir(s, "docs/configuration/single-process-config-blocks.yaml", cortexConfigFile))
32+
33+
// Start Cortex in single binary mode, reading the config from file and overwriting
34+
// the backend config to make it work with Minio.
35+
flags := map[string]string{
36+
"-blocks-storage.s3.access-key-id": e2edb.MinioAccessKey,
37+
"-blocks-storage.s3.secret-access-key": e2edb.MinioSecretKey,
38+
"-blocks-storage.s3.bucket-name": bucketName,
39+
"-blocks-storage.s3.endpoint": fmt.Sprintf("%s-minio-9000:9000", networkName),
40+
"-blocks-storage.s3.insecure": "true",
41+
}
42+
43+
cortex := e2ecortex.NewSingleBinaryWithConfigFile("cortex-1", cortexConfigFile, flags, "", 9009, 9095)
44+
require.NoError(t, s.StartAndWaitReady(cortex))
45+
46+
c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
47+
require.NoError(t, err)
48+
49+
// Push some series to Cortex.
50+
now := time.Now()
51+
series, expectedVector := generateSeries("series_1", now, prompb.Label{Name: "foo", Value: "bar"})
52+
53+
res, err := c.OTLP(series)
54+
require.NoError(t, err)
55+
require.Equal(t, 200, res.StatusCode)
56+
57+
// Query the series.
58+
result, err := c.Query("series_1", now)
59+
require.NoError(t, err)
60+
require.Equal(t, model.ValVector, result.Type())
61+
assert.Equal(t, expectedVector, result.(model.Vector))
62+
63+
labelValues, err := c.LabelValues("foo", time.Time{}, time.Time{}, nil)
64+
require.NoError(t, err)
65+
require.Equal(t, model.LabelValues{"bar"}, labelValues)
66+
67+
labelNames, err := c.LabelNames(time.Time{}, time.Time{})
68+
require.NoError(t, err)
69+
require.Equal(t, []string{"__name__", "foo"}, labelNames)
70+
71+
// Check that a range query does not return an error to sanity check the queryrange tripperware.
72+
_, err = c.QueryRange("series_1", now.Add(-15*time.Minute), now, 15*time.Second)
73+
require.NoError(t, err)
74+
75+
//TODO(friedrichg): test histograms
76+
}

pkg/api/api.go

+1
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
261261
distributorpb.RegisterDistributorServer(a.server.GRPC, d)
262262

263263
a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
264+
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
264265

265266
a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status")
266267
a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/all_user_stats", "Usage Statistics")

pkg/util/push/otlp.go

+174
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
package push
2+
3+
import (
4+
"net/http"
5+
6+
"github.com/go-kit/log/level"
7+
"github.com/prometheus/prometheus/model/labels"
8+
"github.com/prometheus/prometheus/prompb"
9+
"github.com/prometheus/prometheus/storage/remote"
10+
"github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
11+
"github.com/weaveworks/common/httpgrpc"
12+
"github.com/weaveworks/common/middleware"
13+
"go.opentelemetry.io/collector/pdata/pcommon"
14+
"go.opentelemetry.io/collector/pdata/pmetric"
15+
16+
"github.com/cortexproject/cortex/pkg/cortexpb"
17+
"github.com/cortexproject/cortex/pkg/util"
18+
"github.com/cortexproject/cortex/pkg/util/log"
19+
)
20+
21+
// OTLPHandler is a http.Handler which accepts OTLP metrics.
22+
func OTLPHandler(sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
23+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
24+
ctx := r.Context()
25+
logger := log.WithContext(ctx, log.Logger)
26+
if sourceIPs != nil {
27+
source := sourceIPs.Get(r)
28+
if source != "" {
29+
ctx = util.AddSourceIPsToOutgoingContext(ctx, source)
30+
logger = log.WithSourceIPs(source, logger)
31+
}
32+
}
33+
req, err := remote.DecodeOTLPWriteRequest(r)
34+
if err != nil {
35+
level.Error(logger).Log("err", err.Error())
36+
http.Error(w, err.Error(), http.StatusBadRequest)
37+
return
38+
}
39+
40+
tsMap, err := prometheusremotewrite.FromMetrics(convertToMetricsAttributes(req.Metrics()), prometheusremotewrite.Settings{DisableTargetInfo: true})
41+
if err != nil {
42+
level.Error(logger).Log("err", err.Error())
43+
http.Error(w, err.Error(), http.StatusBadRequest)
44+
return
45+
}
46+
47+
prwReq := cortexpb.WriteRequest{
48+
Source: cortexpb.API,
49+
Metadata: nil,
50+
SkipLabelNameValidation: false,
51+
}
52+
53+
tsList := []cortexpb.PreallocTimeseries(nil)
54+
for _, v := range tsMap {
55+
tsList = append(tsList, cortexpb.PreallocTimeseries{TimeSeries: &cortexpb.TimeSeries{
56+
Labels: makeLabels(v.Labels),
57+
Samples: makeSamples(v.Samples),
58+
Exemplars: makeExemplars(v.Exemplars),
59+
}})
60+
}
61+
prwReq.Timeseries = tsList
62+
63+
if _, err := push(ctx, &prwReq); err != nil {
64+
resp, ok := httpgrpc.HTTPResponseFromError(err)
65+
if !ok {
66+
http.Error(w, err.Error(), http.StatusInternalServerError)
67+
return
68+
}
69+
if resp.GetCode()/100 == 5 {
70+
level.Error(logger).Log("msg", "push error", "err", err)
71+
} else if resp.GetCode() != http.StatusAccepted && resp.GetCode() != http.StatusTooManyRequests {
72+
level.Warn(logger).Log("msg", "push refused", "err", err)
73+
}
74+
http.Error(w, string(resp.Body), int(resp.Code))
75+
}
76+
})
77+
}
78+
79+
func makeLabels(in []prompb.Label) []cortexpb.LabelAdapter {
80+
out := make(labels.Labels, 0, len(in))
81+
for _, l := range in {
82+
out = append(out, labels.Label{Name: l.Name, Value: l.Value})
83+
}
84+
return cortexpb.FromLabelsToLabelAdapters(out)
85+
}
86+
87+
func makeSamples(in []prompb.Sample) []cortexpb.Sample {
88+
out := make([]cortexpb.Sample, 0, len(in))
89+
for _, s := range in {
90+
out = append(out, cortexpb.Sample{
91+
Value: s.Value,
92+
TimestampMs: s.Timestamp,
93+
})
94+
}
95+
return out
96+
}
97+
98+
func makeExemplars(in []prompb.Exemplar) []cortexpb.Exemplar {
99+
out := make([]cortexpb.Exemplar, 0, len(in))
100+
for _, e := range in {
101+
out = append(out, cortexpb.Exemplar{
102+
Labels: makeLabels(e.Labels),
103+
Value: e.Value,
104+
TimestampMs: e.Timestamp,
105+
})
106+
}
107+
return out
108+
}
109+
110+
func convertToMetricsAttributes(md pmetric.Metrics) pmetric.Metrics {
111+
cloneMd := pmetric.NewMetrics()
112+
md.CopyTo(cloneMd)
113+
rms := cloneMd.ResourceMetrics()
114+
for i := 0; i < rms.Len(); i++ {
115+
resource := rms.At(i).Resource()
116+
117+
ilms := rms.At(i).ScopeMetrics()
118+
for j := 0; j < ilms.Len(); j++ {
119+
ilm := ilms.At(j)
120+
metricSlice := ilm.Metrics()
121+
for k := 0; k < metricSlice.Len(); k++ {
122+
addAttributesToMetric(metricSlice.At(k), resource.Attributes())
123+
}
124+
}
125+
}
126+
return cloneMd
127+
}
128+
129+
// addAttributesToMetric adds additional labels to the given metric
130+
func addAttributesToMetric(metric pmetric.Metric, labelMap pcommon.Map) {
131+
switch metric.Type() {
132+
case pmetric.MetricTypeGauge:
133+
addAttributesToNumberDataPoints(metric.Gauge().DataPoints(), labelMap)
134+
case pmetric.MetricTypeSum:
135+
addAttributesToNumberDataPoints(metric.Sum().DataPoints(), labelMap)
136+
case pmetric.MetricTypeHistogram:
137+
addAttributesToHistogramDataPoints(metric.Histogram().DataPoints(), labelMap)
138+
case pmetric.MetricTypeSummary:
139+
addAttributesToSummaryDataPoints(metric.Summary().DataPoints(), labelMap)
140+
case pmetric.MetricTypeExponentialHistogram:
141+
addAttributesToExponentialHistogramDataPoints(metric.ExponentialHistogram().DataPoints(), labelMap)
142+
}
143+
}
144+
145+
func addAttributesToNumberDataPoints(ps pmetric.NumberDataPointSlice, newAttributeMap pcommon.Map) {
146+
for i := 0; i < ps.Len(); i++ {
147+
joinAttributeMaps(newAttributeMap, ps.At(i).Attributes())
148+
}
149+
}
150+
151+
func addAttributesToHistogramDataPoints(ps pmetric.HistogramDataPointSlice, newAttributeMap pcommon.Map) {
152+
for i := 0; i < ps.Len(); i++ {
153+
joinAttributeMaps(newAttributeMap, ps.At(i).Attributes())
154+
}
155+
}
156+
157+
func addAttributesToSummaryDataPoints(ps pmetric.SummaryDataPointSlice, newAttributeMap pcommon.Map) {
158+
for i := 0; i < ps.Len(); i++ {
159+
joinAttributeMaps(newAttributeMap, ps.At(i).Attributes())
160+
}
161+
}
162+
163+
func addAttributesToExponentialHistogramDataPoints(ps pmetric.ExponentialHistogramDataPointSlice, newAttributeMap pcommon.Map) {
164+
for i := 0; i < ps.Len(); i++ {
165+
joinAttributeMaps(newAttributeMap, ps.At(i).Attributes())
166+
}
167+
}
168+
169+
func joinAttributeMaps(from, to pcommon.Map) {
170+
from.Range(func(k string, v pcommon.Value) bool {
171+
v.CopyTo(to.PutEmpty(k))
172+
return true
173+
})
174+
}

0 commit comments

Comments
 (0)