Skip to content

Commit 57c00a5

Browse files
authored
ingest-storage: Add consumer support for a new record version 2, based on Remote Write 2.0 (#11406)
* Allow alternate deserializers * Use abstraction in blockbuilder * Naively deserialize plain rw2 * Add benchmark * solve compiler optimization in benchmark * Add an offset to symbols * tests for offsets * simplify * Deserializer tests * v2 covers histograms * Extend tests to cover metadata * update misc tests * linter * fix nits
1 parent b73574f commit 57c00a5

File tree

10 files changed

+464
-78
lines changed

10 files changed

+464
-78
lines changed

pkg/blockbuilder/tsdb.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax
9494
}
9595

9696
// TODO(codesome): see if we can skip parsing exemplars. They are not persisted in the block so we can save some parsing here.
97-
err = req.Unmarshal(rec.Value)
97+
err = ingest.DeserializeRecordContent(rec.Value, &req, version)
9898
if err != nil {
9999
return false, fmt.Errorf("unmarshal record key %s: %w", rec.Key, err)
100100
}

pkg/mimirpb/compat_rw2.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,9 @@ const rw2SymbolPageSize = 16
3232
// mechanism, we would have to allocate a large amount of memory
3333
// or do reallocation. This is a compromise between the two.
3434
type rw2PagedSymbols struct {
35-
count uint32
36-
pages []*[]string
35+
count uint32
36+
pages []*[]string
37+
offset uint32
3738
}
3839

3940
func (ps *rw2PagedSymbols) append(symbol string) {
@@ -55,6 +56,7 @@ func (ps *rw2PagedSymbols) releasePages() {
5556
}
5657

5758
func (ps *rw2PagedSymbols) get(ref uint32) (string, error) {
59+
ref = ref - ps.offset
5860
if ref < ps.count {
5961
page := ps.pages[ref>>rw2SymbolPageSize]
6062
return (*page)[ref&((1<<rw2SymbolPageSize)-1)], nil

pkg/mimirpb/compat_rw2_test.go

Lines changed: 170 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,175 @@ func TestRW2TypesCompatible(t *testing.T) {
4747
}
4848

4949
func TestRW2Unmarshal(t *testing.T) {
50-
syms := test.NewSymbolTableBuilder(nil)
50+
t.Run("rw2 compatible produces expected WriteRequest", func(t *testing.T) {
51+
syms := test.NewSymbolTableBuilder(nil)
52+
// Create a new WriteRequest with some sample data.
53+
writeRequest := makeTestRW2WriteRequest(syms)
54+
data, err := writeRequest.Marshal()
55+
require.NoError(t, err)
5156

52-
// Create a new WriteRequest with some sample data.
53-
writeRequest := &rw2.Request{
57+
// Unmarshal the data back into Mimir's WriteRequest.
58+
received := PreallocWriteRequest{}
59+
received.UnmarshalFromRW2 = true
60+
err = received.Unmarshal(data)
61+
require.NoError(t, err)
62+
63+
expected := &PreallocWriteRequest{
64+
WriteRequest: WriteRequest{
65+
Timeseries: []PreallocTimeseries{
66+
{
67+
TimeSeries: &TimeSeries{
68+
Labels: []LabelAdapter{
69+
{
70+
Name: "__name__",
71+
Value: "test_metric_total",
72+
},
73+
{
74+
Name: "job",
75+
Value: "test_job",
76+
},
77+
},
78+
Samples: []Sample{
79+
{
80+
Value: 123.456,
81+
TimestampMs: 1234567890,
82+
},
83+
},
84+
Exemplars: []Exemplar{
85+
{
86+
Value: 123.456,
87+
TimestampMs: 1234567890,
88+
Labels: []LabelAdapter{
89+
{
90+
Name: "__name__",
91+
Value: "test_metric_total",
92+
},
93+
{
94+
Name: "traceID",
95+
Value: "1234567890abcdef",
96+
},
97+
},
98+
},
99+
},
100+
},
101+
},
102+
},
103+
Metadata: []*MetricMetadata{
104+
{
105+
MetricFamilyName: "test_metric_total",
106+
Type: COUNTER,
107+
Help: "test_metric_help",
108+
Unit: "test_metric_unit",
109+
},
110+
},
111+
unmarshalFromRW2: true,
112+
},
113+
UnmarshalFromRW2: true,
114+
}
115+
116+
// Check that the unmarshalled data matches the original data.
117+
require.Equal(t, expected, &received)
118+
})
119+
120+
t.Run("rw2 with offset produces expected WriteRequest", func(t *testing.T) {
121+
syms := test.NewSymbolTableBuilderWithOffset(nil, 256)
122+
// Create a new WriteRequest with some sample data.
123+
writeRequest := makeTestRW2WriteRequest(syms)
124+
data, err := writeRequest.Marshal()
125+
require.NoError(t, err)
126+
127+
// Unmarshal the data back into Mimir's WriteRequest.
128+
received := PreallocWriteRequest{}
129+
received.UnmarshalFromRW2 = true
130+
received.RW2SymbolOffset = 256
131+
err = received.Unmarshal(data)
132+
require.NoError(t, err)
133+
134+
expected := &PreallocWriteRequest{
135+
WriteRequest: WriteRequest{
136+
Timeseries: []PreallocTimeseries{
137+
{
138+
TimeSeries: &TimeSeries{
139+
Labels: []LabelAdapter{
140+
{
141+
Name: "__name__",
142+
Value: "test_metric_total",
143+
},
144+
{
145+
Name: "job",
146+
Value: "test_job",
147+
},
148+
},
149+
Samples: []Sample{
150+
{
151+
Value: 123.456,
152+
TimestampMs: 1234567890,
153+
},
154+
},
155+
Exemplars: []Exemplar{
156+
{
157+
Value: 123.456,
158+
TimestampMs: 1234567890,
159+
Labels: []LabelAdapter{
160+
{
161+
Name: "__name__",
162+
Value: "test_metric_total",
163+
},
164+
{
165+
Name: "traceID",
166+
Value: "1234567890abcdef",
167+
},
168+
},
169+
},
170+
},
171+
},
172+
},
173+
},
174+
Metadata: []*MetricMetadata{
175+
{
176+
MetricFamilyName: "test_metric_total",
177+
Type: COUNTER,
178+
Help: "test_metric_help",
179+
Unit: "test_metric_unit",
180+
},
181+
},
182+
unmarshalFromRW2: true,
183+
rw2symbols: rw2PagedSymbols{offset: 256},
184+
},
185+
UnmarshalFromRW2: true,
186+
RW2SymbolOffset: 256,
187+
}
188+
189+
// Check that the unmarshalled data matches the original data.
190+
require.Equal(t, expected, &received)
191+
})
192+
193+
t.Run("wrong offset fails to unmarshal", func(t *testing.T) {
194+
syms := test.NewSymbolTableBuilderWithOffset(nil, 256)
195+
// Create a new WriteRequest with some sample data.
196+
writeRequest := makeTestRW2WriteRequest(syms)
197+
data, err := writeRequest.Marshal()
198+
require.NoError(t, err)
199+
200+
// Unmarshal the data back into Mimir's WriteRequest.
201+
received := PreallocWriteRequest{}
202+
received.UnmarshalFromRW2 = true
203+
received.RW2SymbolOffset = 257
204+
err = received.Unmarshal(data)
205+
require.ErrorContains(t, err, "invalid")
206+
207+
// Unmarshal the data back into Mimir's WriteRequest.
208+
received = PreallocWriteRequest{}
209+
received.UnmarshalFromRW2 = true
210+
received.RW2SymbolOffset = 255
211+
err = received.Unmarshal(data)
212+
213+
require.ErrorContains(t, err, "invalid")
214+
})
215+
}
216+
217+
func makeTestRW2WriteRequest(syms *test.SymbolTableBuilder) *rw2.Request {
218+
req := &rw2.Request{
54219
Timeseries: []rw2.TimeSeries{
55220
{
56221
LabelsRefs: []uint32{syms.GetSymbol("__name__"), syms.GetSymbol("test_metric_total"), syms.GetSymbol("job"), syms.GetSymbol("test_job")},
@@ -75,69 +240,6 @@ func TestRW2Unmarshal(t *testing.T) {
75240
},
76241
},
77242
}
78-
writeRequest.Symbols = syms.GetSymbols()
79-
data, err := writeRequest.Marshal()
80-
require.NoError(t, err)
81-
82-
// Unmarshal the data back into Mimir's WriteRequest.
83-
received := PreallocWriteRequest{}
84-
received.UnmarshalFromRW2 = true
85-
err = received.Unmarshal(data)
86-
require.NoError(t, err)
87-
88-
expected := &PreallocWriteRequest{
89-
WriteRequest: WriteRequest{
90-
Timeseries: []PreallocTimeseries{
91-
{
92-
TimeSeries: &TimeSeries{
93-
Labels: []LabelAdapter{
94-
{
95-
Name: "__name__",
96-
Value: "test_metric_total",
97-
},
98-
{
99-
Name: "job",
100-
Value: "test_job",
101-
},
102-
},
103-
Samples: []Sample{
104-
{
105-
Value: 123.456,
106-
TimestampMs: 1234567890,
107-
},
108-
},
109-
Exemplars: []Exemplar{
110-
{
111-
Value: 123.456,
112-
TimestampMs: 1234567890,
113-
Labels: []LabelAdapter{
114-
{
115-
Name: "__name__",
116-
Value: "test_metric_total",
117-
},
118-
{
119-
Name: "traceID",
120-
Value: "1234567890abcdef",
121-
},
122-
},
123-
},
124-
},
125-
},
126-
},
127-
},
128-
Metadata: []*MetricMetadata{
129-
{
130-
MetricFamilyName: "test_metric_total",
131-
Type: COUNTER,
132-
Help: "test_metric_help",
133-
Unit: "test_metric_unit",
134-
},
135-
},
136-
unmarshalFromRW2: true,
137-
},
138-
UnmarshalFromRW2: true,
139-
}
140-
141-
// Check that the unmarshalled data matches the original data.
142-
require.Equal(t, expected, &received)
243+
req.Symbols = syms.GetSymbols()
244+
return req
143245
}

pkg/mimirpb/timeseries.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ type PreallocWriteRequest struct {
6666

6767
// UnmarshalRW2 is set to true if the Unmarshal method should unmarshal the data as a remote write 2.0 message.
6868
UnmarshalFromRW2 bool
69+
70+
// RW2SymbolOffset is an optimization used for RW2-adjacent applications where typical symbol refs are shifted by an offset.
71+
// This allows certain symbols to be reserved without being present in the symbols list.
72+
RW2SymbolOffset uint32
6973
}
7074

7175
// Unmarshal implements proto.Message.
@@ -75,6 +79,7 @@ func (p *PreallocWriteRequest) Unmarshal(dAtA []byte) error {
7579
p.Timeseries = PreallocTimeseriesSliceFromPool()
7680
p.skipUnmarshalingExemplars = p.SkipUnmarshalingExemplars
7781
p.unmarshalFromRW2 = p.UnmarshalFromRW2
82+
p.rw2symbols.offset = p.RW2SymbolOffset
7883
return p.WriteRequest.Unmarshal(dAtA)
7984
}
8085

pkg/storage/ingest/pusher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func (c pusherConsumer) Consume(ctx context.Context, records []record) (returnEr
104104
}
105105

106106
// We don't free the WriteRequest slices because they are being freed by a level below.
107-
err := parsed.Unmarshal(r.content)
107+
err := DeserializeRecordContent(r.content, parsed.PreallocWriteRequest, r.version)
108108
if err != nil {
109109
parsed.err = fmt.Errorf("parsing ingest consumer write request: %w", err)
110110
}

pkg/storage/ingest/pusher_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ func TestPusherConsumer(t *testing.T) {
140140
expectedWRs: writeReqs[0:2],
141141
expErr: "",
142142
expectedLogLines: []string{
143-
"level=error msg=\"failed to parse write request; skipping\" err=\"received a record with an unsupported version: 101, max supported version: 1\"",
143+
"level=error msg=\"failed to parse write request; skipping\" err=\"parsing ingest consumer write request: received a record with an unsupported version: 101, max supported version: 2\"",
144144
},
145145
},
146146
"failed processing of record": {

pkg/storage/ingest/version.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ import (
1313

1414
const (
1515
RecordVersionHeaderKey = "Version"
16-
LatestRecordVersion = 1
16+
LatestRecordVersion = 2
17+
V2RecordSymbolOffset = 64
1718
)
1819

1920
func ValidateRecordVersion(version int) error {
@@ -84,3 +85,27 @@ func (v versionOneRecordSerializer) ToRecords(partitionID int32, tenantID string
8485
}
8586
return records, nil
8687
}
88+
89+
func DeserializeRecordContent(content []byte, wr *mimirpb.PreallocWriteRequest, version int) error {
90+
switch version {
91+
case 0:
92+
// V0 is body-compatible with V1.
93+
fallthrough
94+
case 1:
95+
return deserializeRecordContentV1(content, wr)
96+
case 2:
97+
return deserializeRecordContentV2(content, wr)
98+
default:
99+
return fmt.Errorf("received a record with an unsupported version: %d, max supported version: %d", version, LatestRecordVersion)
100+
}
101+
}
102+
103+
func deserializeRecordContentV1(content []byte, wr *mimirpb.PreallocWriteRequest) error {
104+
return wr.Unmarshal(content)
105+
}
106+
107+
func deserializeRecordContentV2(content []byte, wr *mimirpb.PreallocWriteRequest) error {
108+
wr.UnmarshalFromRW2 = true
109+
wr.RW2SymbolOffset = V2RecordSymbolOffset
110+
return wr.Unmarshal(content)
111+
}

0 commit comments

Comments
 (0)