Skip to content

Commit ee6f843

Browse files
committed
Start implementing direct unmarshal from RW2 into mimirpb.Timeseries
Signed-off-by: György Krajcsovits <[email protected]> # Conflicts: # pkg/mimirpb/compat_rw2_test.go
1 parent 8cbf635 commit ee6f843

File tree

6 files changed

+115
-124
lines changed

6 files changed

+115
-124
lines changed

pkg/distributor/distributor_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -8544,7 +8544,7 @@ func cloneTimeseries(orig *mimirpb.TimeSeries) (*mimirpb.TimeSeries, error) {
85448544
}
85458545

85468546
cloned := &mimirpb.TimeSeries{}
8547-
err = cloned.Unmarshal(data)
8547+
err = cloned.Unmarshal(data, nil)
85488548
return cloned, err
85498549
}
85508550

pkg/ingester/client/ingester.pb.go

+3-3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/mimirpb/compat_rw2_test.go

+39
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,42 @@ func TestRW2TypesCompatible(t *testing.T) {
4545

4646
require.Equal(t, expectedTree.String(), actualTree.String(), "Proto types are not compatible")
4747
}
48+
49+
func TestXxx(t *testing.T) {
50+
syms := test.NewSymbolTableBuilder(nil)
51+
52+
// Create a new WriteRequest with some sample data.
53+
writeRequest := &rw2.Request{
54+
Timeseries: []rw2.TimeSeries{
55+
{
56+
LabelsRefs: []uint32{syms.GetSymbol("__name__"), syms.GetSymbol("test_metric"), syms.GetSymbol("job"), syms.GetSymbol("test_job")},
57+
Samples: []rw2.Sample{
58+
{
59+
Value: 123.456,
60+
Timestamp: 1234567890,
61+
},
62+
},
63+
},
64+
},
65+
}
66+
writeRequest.Symbols = syms.GetSymbols()
67+
data, err := writeRequest.Marshal()
68+
require.NoError(t, err)
69+
70+
// Unmarshal the data back into Mimir's WriteRequest.
71+
received := PreallocWriteRequest{}
72+
received.UnmarshalFromRW2 = true
73+
err = received.Unmarshal(data)
74+
require.NoError(t, err)
75+
76+
// Check that the unmarshalled data matches the original data.
77+
require.Len(t,received.Timeseries, 1)
78+
require.Len(t, received.Timeseries[0].TimeSeries.Labels, 2)
79+
require.Equal(t, received.Timeseries[0].TimeSeries.Labels[0].Name, "__name__")
80+
require.Equal(t, received.Timeseries[0].TimeSeries.Labels[0].Value, "test_metric")
81+
require.Equal(t, received.Timeseries[0].TimeSeries.Labels[1].Name, "job")
82+
require.Equal(t, received.Timeseries[0].TimeSeries.Labels[1].Value, "test_job")
83+
require.Len(t, received.Timeseries[0].TimeSeries.Samples, 1)
84+
require.Equal(t, received.Timeseries[0].TimeSeries.Samples[0].Value, 123.456)
85+
require.Equal(t, received.Timeseries[0].TimeSeries.Samples[0].TimestampMs, 1234567890)
86+
}

pkg/mimirpb/mimir.pb.go

+61-31
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/mimirpb/timeseries.go

+9-87
Original file line numberDiff line numberDiff line change
@@ -74,92 +74,11 @@ type PreallocWriteRequest struct {
7474
func (p *PreallocWriteRequest) Unmarshal(dAtA []byte) error {
7575
p.Timeseries = PreallocTimeseriesSliceFromPool()
7676
p.WriteRequest.skipUnmarshalingExemplars = p.SkipUnmarshalingExemplars
77-
78-
if p.UnmarshalFromRW2 {
79-
return p.unmarshalRW2(dAtA)
80-
}
77+
p.WriteRequest.UnmarshalFromRW2 = p.UnmarshalFromRW2
8178

8279
return p.WriteRequest.Unmarshal(dAtA)
8380
}
8481

85-
// UnmarshalRW2 unmarshals the given remote write 2.0 data and converts it to a WriteRequest.
86-
func (p *PreallocWriteRequest) unmarshalRW2(data []byte) error {
87-
rw2req := &WriteRequestRW2{}
88-
rw2req.skipUnmarshalingExemplars = p.SkipUnmarshalingExemplars
89-
if err := rw2req.Unmarshal(data); err != nil {
90-
return err
91-
}
92-
93-
metricFamilies := map[string]*MetricMetadata{}
94-
95-
for _, ts := range rw2req.Timeseries {
96-
p.Timeseries = append(p.Timeseries, PreallocTimeseries{})
97-
p.Timeseries[len(p.Timeseries)-1].TimeSeries = TimeseriesFromPool()
98-
p.Timeseries[len(p.Timeseries)-1].TimeSeries.CreatedTimestamp = ts.CreatedTimestamp
99-
var err error
100-
p.Timeseries[len(p.Timeseries)-1].TimeSeries.Labels, err = labelRefsToLabelAdapter(ts.LabelsRefs, rw2req.Symbols)
101-
if err != nil {
102-
return err
103-
}
104-
p.Timeseries[len(p.Timeseries)-1].TimeSeries.Samples = ts.Samples
105-
p.Timeseries[len(p.Timeseries)-1].TimeSeries.Histograms = ts.Histograms
106-
if len(ts.Exemplars) > 0 {
107-
p.Timeseries[len(p.Timeseries)-1].TimeSeries.Exemplars = make([]Exemplar, 0, len(ts.Exemplars))
108-
for i := range ts.Exemplars {
109-
lbls, err := labelRefsToLabelAdapter(ts.Exemplars[i].LabelsRefs, rw2req.Symbols)
110-
if err != nil {
111-
return err
112-
}
113-
p.Timeseries[len(p.Timeseries)-1].TimeSeries.Exemplars = append(p.Timeseries[len(p.Timeseries)-1].TimeSeries.Exemplars, Exemplar{
114-
Labels: lbls,
115-
Value: ts.Exemplars[i].Value,
116-
TimestampMs: ts.Exemplars[i].Timestamp,
117-
})
118-
}
119-
}
120-
121-
// Convert RW2 metadata to RW1 metadata.
122-
seriesName := getSeriesName(p.Timeseries[len(p.Timeseries)-1].TimeSeries.Labels)
123-
if seriesName == "" {
124-
continue
125-
}
126-
metricFamily, _ := getMetricName(seriesName, ts.Metadata.Type)
127-
if metricFamily == "" {
128-
continue
129-
}
130-
help, _ := getSymbol(ts.Metadata.HelpRef, rw2req.Symbols)
131-
unit, _ := getSymbol(ts.Metadata.UnitRef, rw2req.Symbols)
132-
133-
if ts.Metadata.Type == METRIC_TYPE_UNSPECIFIED && help == "" && unit == "" {
134-
// Nothing to do here.
135-
continue
136-
}
137-
138-
metricFamilies[metricFamily] = &MetricMetadata{
139-
Type: MetricMetadata_MetricType(ts.Metadata.Type),
140-
MetricFamilyName: metricFamily,
141-
Help: help,
142-
Unit: unit,
143-
}
144-
}
145-
146-
// Fill the metadata
147-
p.Metadata = make([]*MetricMetadata, 0, len(metricFamilies))
148-
for _, metadata := range metricFamilies {
149-
p.Metadata = append(p.Metadata, metadata)
150-
}
151-
152-
return nil
153-
}
154-
155-
// getSymbol resolves the symbol reference to a string.
156-
func getSymbol(ref uint32, symbols []string) (string, error) {
157-
if ref < uint32(len(symbols)) {
158-
return symbols[ref], nil
159-
}
160-
return "", fmt.Errorf("symbol reference %d is out of bounds", ref)
161-
}
162-
16382
// labelRefsToLabelAdapter converts a slice of label references to a slice
16483
// of LabelAdapter.
16584
func labelRefsToLabelAdapter(refs []uint32, symbols []string) ([]LabelAdapter, error) {
@@ -168,11 +87,11 @@ func labelRefsToLabelAdapter(refs []uint32, symbols []string) ([]LabelAdapter, e
16887
}
16988
labels := make([]LabelAdapter, 0, len(refs)/2)
17089
for i := 0; i < len(refs); i += 2 {
171-
name, err := getSymbol(refs[i], symbols)
90+
name, err := getRW2Symbol(refs[i], symbols)
17291
if err != nil {
17392
return nil, err
17493
}
175-
value, err := getSymbol(refs[i+1], symbols)
94+
value, err := getRW2Symbol(refs[i+1], symbols)
17695
if err != nil {
17796
return nil, err
17897
}
@@ -254,6 +173,7 @@ type PreallocTimeseries struct {
254173
marshalledData []byte
255174

256175
skipUnmarshalingExemplars bool
176+
unmarshalFromRW2 bool
257177
}
258178

259179
// RemoveLabel removes the label labelName from this timeseries, if it exists.
@@ -376,13 +296,15 @@ var TimeseriesUnmarshalCachingEnabled = true
376296
// Unmarshal implements proto.Message. Input data slice is retained.
377297
// Copied from the protobuf generated code, the only change is that in case 3 the exemplars don't get unmarshaled
378298
// if p.skipUnmarshalingExemplars is false.
379-
func (p *PreallocTimeseries) Unmarshal(dAtA []byte) error {
380-
if TimeseriesUnmarshalCachingEnabled {
299+
func (p *PreallocTimeseries) Unmarshal(dAtA []byte, symbols []string) error {
300+
if TimeseriesUnmarshalCachingEnabled && !p.unmarshalFromRW2 {
301+
// TODO(krajorama): check if it makes sense for RW2 as well.
381302
p.marshalledData = dAtA
382303
}
383304
p.TimeSeries = TimeseriesFromPool()
384305
p.TimeSeries.SkipUnmarshalingExemplars = p.skipUnmarshalingExemplars
385-
return p.TimeSeries.Unmarshal(dAtA)
306+
p.TimeSeries.UnmarshalFromRW2 = p.unmarshalFromRW2
307+
return p.TimeSeries.Unmarshal(dAtA, symbols)
386308
}
387309

388310
func (p *PreallocTimeseries) Size() int {

pkg/mimirpb/timeseries_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -302,13 +302,13 @@ func TestPreallocTimeseries_Unmarshal(t *testing.T) {
302302

303303
TimeseriesUnmarshalCachingEnabled = false
304304

305-
require.NoError(t, msg.Unmarshal(data))
305+
require.NoError(t, msg.Unmarshal(data, nil))
306306
require.True(t, src.Equal(msg.TimeSeries))
307307
require.Nil(t, msg.marshalledData)
308308

309309
TimeseriesUnmarshalCachingEnabled = true
310310

311-
require.NoError(t, msg.Unmarshal(data))
311+
require.NoError(t, msg.Unmarshal(data, nil))
312312
require.True(t, src.Equal(msg.TimeSeries))
313313
require.NotNil(t, msg.marshalledData)
314314
}

0 commit comments

Comments
 (0)