Skip to content

Commit fc8df4d

Browse files
committed
Start implementing direct unmarshal from RW2 into mimirpb.Timeseries
Signed-off-by: György Krajcsovits <[email protected]>
1 parent 2d996c2 commit fc8df4d

File tree

6 files changed

+127
-124
lines changed

6 files changed

+127
-124
lines changed

pkg/distributor/distributor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8543,7 +8543,7 @@ func cloneTimeseries(orig *mimirpb.TimeSeries) (*mimirpb.TimeSeries, error) {
85438543
}
85448544

85458545
cloned := &mimirpb.TimeSeries{}
8546-
err = cloned.Unmarshal(data)
8546+
err = cloned.Unmarshal(data, nil)
85478547
return cloned, err
85488548
}
85498549

pkg/ingester/client/ingester.pb.go

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/mimirpb/compat_rw2_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// SPDX-License-Identifier: AGPL-3.0-only
2+
3+
package mimirpb
4+
5+
import (
6+
"testing"
7+
8+
rw2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
9+
"github.com/stretchr/testify/require"
10+
11+
rw2util "github.com/grafana/mimir/pkg/util/test"
12+
)
13+
14+
func TestXxx(t *testing.T) {
15+
syms := rw2util.NewSymbolTableBuilder(nil)
16+
17+
// Create a new WriteRequest with some sample data.
18+
writeRequest := &rw2.Request{
19+
Timeseries: []rw2.TimeSeries{
20+
{
21+
LabelsRefs: []uint32{syms.GetSymbol("__name__"), syms.GetSymbol("test_metric"), syms.GetSymbol("job"), syms.GetSymbol("test_job")},
22+
Samples: []rw2.Sample{
23+
{
24+
Value: 123.456,
25+
Timestamp: 1234567890,
26+
},
27+
},
28+
},
29+
},
30+
}
31+
writeRequest.Symbols = syms.GetSymbols()
32+
data, err := writeRequest.Marshal()
33+
require.NoError(t, err)
34+
35+
// Unmarshal the data back into Mimir's WriteRequest.
36+
received := PreallocWriteRequest{}
37+
received.UnmarshalFromRW2 = true
38+
err = received.Unmarshal(data)
39+
require.NoError(t, err)
40+
41+
// Check that the unmarshalled data matches the original data.
42+
require.Len(t,received.Timeseries, 1)
43+
require.Len(t, received.Timeseries[0].TimeSeries.Labels, 2)
44+
require.Equal(t, received.Timeseries[0].TimeSeries.Labels[0].Name, "__name__")
45+
require.Equal(t, received.Timeseries[0].TimeSeries.Labels[0].Value, "test_metric")
46+
require.Equal(t, received.Timeseries[0].TimeSeries.Labels[1].Name, "job")
47+
require.Equal(t, received.Timeseries[0].TimeSeries.Labels[1].Value, "test_job")
48+
require.Len(t, received.Timeseries[0].TimeSeries.Samples, 1)
49+
require.Equal(t, received.Timeseries[0].TimeSeries.Samples[0].Value, 123.456)
50+
require.Equal(t, received.Timeseries[0].TimeSeries.Samples[0].TimestampMs, 1234567890)
51+
}

pkg/mimirpb/mimir.pb.go

Lines changed: 61 additions & 31 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/mimirpb/timeseries.go

Lines changed: 9 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -74,92 +74,11 @@ type PreallocWriteRequest struct {
7474
func (p *PreallocWriteRequest) Unmarshal(dAtA []byte) error {
7575
p.Timeseries = PreallocTimeseriesSliceFromPool()
7676
p.WriteRequest.skipUnmarshalingExemplars = p.SkipUnmarshalingExemplars
77-
78-
if p.UnmarshalFromRW2 {
79-
return p.unmarshalRW2(dAtA)
80-
}
77+
p.WriteRequest.UnmarshalFromRW2 = p.UnmarshalFromRW2
8178

8279
return p.WriteRequest.Unmarshal(dAtA)
8380
}
8481

85-
// UnmarshalRW2 unmarshals the given remote write 2.0 data and converts it to a WriteRequest.
86-
func (p *PreallocWriteRequest) unmarshalRW2(data []byte) error {
87-
rw2req := &WriteRequestRW2{}
88-
rw2req.skipUnmarshalingExemplars = p.SkipUnmarshalingExemplars
89-
if err := rw2req.Unmarshal(data); err != nil {
90-
return err
91-
}
92-
93-
metricFamilies := map[string]*MetricMetadata{}
94-
95-
for _, ts := range rw2req.Timeseries {
96-
p.Timeseries = append(p.Timeseries, PreallocTimeseries{})
97-
p.Timeseries[len(p.Timeseries)-1].TimeSeries = TimeseriesFromPool()
98-
p.Timeseries[len(p.Timeseries)-1].TimeSeries.CreatedTimestamp = ts.CreatedTimestamp
99-
var err error
100-
p.Timeseries[len(p.Timeseries)-1].TimeSeries.Labels, err = labelRefsToLabelAdapter(ts.LabelsRefs, rw2req.Symbols)
101-
if err != nil {
102-
return err
103-
}
104-
p.Timeseries[len(p.Timeseries)-1].TimeSeries.Samples = ts.Samples
105-
p.Timeseries[len(p.Timeseries)-1].TimeSeries.Histograms = ts.Histograms
106-
if len(ts.Exemplars) > 0 {
107-
p.Timeseries[len(p.Timeseries)-1].TimeSeries.Exemplars = make([]Exemplar, 0, len(ts.Exemplars))
108-
for i := range ts.Exemplars {
109-
lbls, err := labelRefsToLabelAdapter(ts.Exemplars[i].LabelsRefs, rw2req.Symbols)
110-
if err != nil {
111-
return err
112-
}
113-
p.Timeseries[len(p.Timeseries)-1].TimeSeries.Exemplars = append(p.Timeseries[len(p.Timeseries)-1].TimeSeries.Exemplars, Exemplar{
114-
Labels: lbls,
115-
Value: ts.Exemplars[i].Value,
116-
TimestampMs: ts.Exemplars[i].Timestamp,
117-
})
118-
}
119-
}
120-
121-
// Convert RW2 metadata to RW1 metadata.
122-
seriesName := getSeriesName(p.Timeseries[len(p.Timeseries)-1].TimeSeries.Labels)
123-
if seriesName == "" {
124-
continue
125-
}
126-
metricFamily, _ := getMetricName(seriesName, ts.Metadata.Type)
127-
if metricFamily == "" {
128-
continue
129-
}
130-
help, _ := getSymbol(ts.Metadata.HelpRef, rw2req.Symbols)
131-
unit, _ := getSymbol(ts.Metadata.UnitRef, rw2req.Symbols)
132-
133-
if ts.Metadata.Type == METRIC_TYPE_UNSPECIFIED && help == "" && unit == "" {
134-
// Nothing to do here.
135-
continue
136-
}
137-
138-
metricFamilies[metricFamily] = &MetricMetadata{
139-
Type: MetricMetadata_MetricType(ts.Metadata.Type),
140-
MetricFamilyName: metricFamily,
141-
Help: help,
142-
Unit: unit,
143-
}
144-
}
145-
146-
// Fill the metadata
147-
p.Metadata = make([]*MetricMetadata, 0, len(metricFamilies))
148-
for _, metadata := range metricFamilies {
149-
p.Metadata = append(p.Metadata, metadata)
150-
}
151-
152-
return nil
153-
}
154-
155-
// getSymbol resolves the symbol reference to a string.
156-
func getSymbol(ref uint32, symbols []string) (string, error) {
157-
if ref < uint32(len(symbols)) {
158-
return symbols[ref], nil
159-
}
160-
return "", fmt.Errorf("symbol reference %d is out of bounds", ref)
161-
}
162-
16382
// labelRefsToLabelAdapter converts a slice of label references to a slice
16483
// of LabelAdapter.
16584
func labelRefsToLabelAdapter(refs []uint32, symbols []string) ([]LabelAdapter, error) {
@@ -168,11 +87,11 @@ func labelRefsToLabelAdapter(refs []uint32, symbols []string) ([]LabelAdapter, e
16887
}
16988
labels := make([]LabelAdapter, 0, len(refs)/2)
17089
for i := 0; i < len(refs); i += 2 {
171-
name, err := getSymbol(refs[i], symbols)
90+
name, err := getRW2Symbol(refs[i], symbols)
17291
if err != nil {
17392
return nil, err
17493
}
175-
value, err := getSymbol(refs[i+1], symbols)
94+
value, err := getRW2Symbol(refs[i+1], symbols)
17695
if err != nil {
17796
return nil, err
17897
}
@@ -254,6 +173,7 @@ type PreallocTimeseries struct {
254173
marshalledData []byte
255174

256175
skipUnmarshalingExemplars bool
176+
unmarshalFromRW2 bool
257177
}
258178

259179
// RemoveLabel removes the label labelName from this timeseries, if it exists.
@@ -376,13 +296,15 @@ var TimeseriesUnmarshalCachingEnabled = true
376296
// Unmarshal implements proto.Message. Input data slice is retained.
377297
// Copied from the protobuf generated code, the only change is that in case 3 the exemplars don't get unmarshaled
378298
// if p.skipUnmarshalingExemplars is false.
379-
func (p *PreallocTimeseries) Unmarshal(dAtA []byte) error {
380-
if TimeseriesUnmarshalCachingEnabled {
299+
func (p *PreallocTimeseries) Unmarshal(dAtA []byte, symbols []string) error {
300+
if TimeseriesUnmarshalCachingEnabled && !p.unmarshalFromRW2 {
301+
// TODO(krajorama): check if it makes sense for RW2 as well.
381302
p.marshalledData = dAtA
382303
}
383304
p.TimeSeries = TimeseriesFromPool()
384305
p.TimeSeries.SkipUnmarshalingExemplars = p.skipUnmarshalingExemplars
385-
return p.TimeSeries.Unmarshal(dAtA)
306+
p.TimeSeries.UnmarshalFromRW2 = p.unmarshalFromRW2
307+
return p.TimeSeries.Unmarshal(dAtA, symbols)
386308
}
387309

388310
func (p *PreallocTimeseries) Size() int {

0 commit comments

Comments
 (0)