Skip to content

ingest-storage: Add consumer support for a new record version 2, based on Remote Write 2.0 #11406

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
May 23, 2025
2 changes: 1 addition & 1 deletion pkg/blockbuilder/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax
}

// TODO(codesome): see if we can skip parsing exemplars. They are not persisted in the block so we can save some parsing here.
err = req.Unmarshal(rec.Value)
err = ingest.DeserializeRecordContent(rec.Value, &req, version)
if err != nil {
return false, fmt.Errorf("unmarshal record key %s: %w", rec.Key, err)
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/mimirpb/compat_rw2.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ const rw2SymbolPageSize = 16
// mechanism, we would have to allocate a large amount of memory
// or do reallocation. This is a compromise between the two.
type rw2PagedSymbols struct {
count uint32
pages []*[]string
count uint32
pages []*[]string
offset uint32
}

func (ps *rw2PagedSymbols) append(symbol string) {
Expand All @@ -55,6 +56,7 @@ func (ps *rw2PagedSymbols) releasePages() {
}

func (ps *rw2PagedSymbols) get(ref uint32) (string, error) {
ref = ref - ps.offset
if ref < ps.count {
page := ps.pages[ref>>rw2SymbolPageSize]
return (*page)[ref&((1<<rw2SymbolPageSize)-1)], nil
Expand Down
238 changes: 170 additions & 68 deletions pkg/mimirpb/compat_rw2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,175 @@ func TestRW2TypesCompatible(t *testing.T) {
}

func TestRW2Unmarshal(t *testing.T) {
syms := test.NewSymbolTableBuilder(nil)
t.Run("rw2 compatible produces expected WriteRequest", func(t *testing.T) {
syms := test.NewSymbolTableBuilder(nil)
// Create a new WriteRequest with some sample data.
writeRequest := makeTestRW2WriteRequest(syms)
data, err := writeRequest.Marshal()
require.NoError(t, err)

// Create a new WriteRequest with some sample data.
writeRequest := &rw2.Request{
// Unmarshal the data back into Mimir's WriteRequest.
received := PreallocWriteRequest{}
received.UnmarshalFromRW2 = true
err = received.Unmarshal(data)
require.NoError(t, err)

expected := &PreallocWriteRequest{
WriteRequest: WriteRequest{
Timeseries: []PreallocTimeseries{
{
TimeSeries: &TimeSeries{
Labels: []LabelAdapter{
{
Name: "__name__",
Value: "test_metric_total",
},
{
Name: "job",
Value: "test_job",
},
},
Samples: []Sample{
{
Value: 123.456,
TimestampMs: 1234567890,
},
},
Exemplars: []Exemplar{
{
Value: 123.456,
TimestampMs: 1234567890,
Labels: []LabelAdapter{
{
Name: "__name__",
Value: "test_metric_total",
},
{
Name: "traceID",
Value: "1234567890abcdef",
},
},
},
},
},
},
},
Metadata: []*MetricMetadata{
{
MetricFamilyName: "test_metric_total",
Type: COUNTER,
Help: "test_metric_help",
Unit: "test_metric_unit",
},
},
unmarshalFromRW2: true,
},
UnmarshalFromRW2: true,
}

// Check that the unmarshalled data matches the original data.
require.Equal(t, expected, &received)
})

t.Run("rw2 with offset produces expected WriteRequest", func(t *testing.T) {
syms := test.NewSymbolTableBuilderWithOffset(nil, 256)
// Create a new WriteRequest with some sample data.
writeRequest := makeTestRW2WriteRequest(syms)
data, err := writeRequest.Marshal()
require.NoError(t, err)

// Unmarshal the data back into Mimir's WriteRequest.
received := PreallocWriteRequest{}
received.UnmarshalFromRW2 = true
received.RW2SymbolOffset = 256
err = received.Unmarshal(data)
require.NoError(t, err)

expected := &PreallocWriteRequest{
WriteRequest: WriteRequest{
Timeseries: []PreallocTimeseries{
{
TimeSeries: &TimeSeries{
Labels: []LabelAdapter{
{
Name: "__name__",
Value: "test_metric_total",
},
{
Name: "job",
Value: "test_job",
},
},
Samples: []Sample{
{
Value: 123.456,
TimestampMs: 1234567890,
},
},
Exemplars: []Exemplar{
{
Value: 123.456,
TimestampMs: 1234567890,
Labels: []LabelAdapter{
{
Name: "__name__",
Value: "test_metric_total",
},
{
Name: "traceID",
Value: "1234567890abcdef",
},
},
},
},
},
},
},
Metadata: []*MetricMetadata{
{
MetricFamilyName: "test_metric_total",
Type: COUNTER,
Help: "test_metric_help",
Unit: "test_metric_unit",
},
},
unmarshalFromRW2: true,
rw2symbols: rw2PagedSymbols{offset: 256},
},
UnmarshalFromRW2: true,
RW2SymbolOffset: 256,
}

// Check that the unmarshalled data matches the original data.
require.Equal(t, expected, &received)
})

t.Run("wrong offset fails to unmarshal", func(t *testing.T) {
syms := test.NewSymbolTableBuilderWithOffset(nil, 256)
// Create a new WriteRequest with some sample data.
writeRequest := makeTestRW2WriteRequest(syms)
data, err := writeRequest.Marshal()
require.NoError(t, err)

// Unmarshal the data back into Mimir's WriteRequest.
received := PreallocWriteRequest{}
received.UnmarshalFromRW2 = true
received.RW2SymbolOffset = 257
err = received.Unmarshal(data)
require.ErrorContains(t, err, "invalid")

// Unmarshal the data back into Mimir's WriteRequest.
received = PreallocWriteRequest{}
received.UnmarshalFromRW2 = true
received.RW2SymbolOffset = 255
err = received.Unmarshal(data)

require.ErrorContains(t, err, "invalid")
})
}

func makeTestRW2WriteRequest(syms *test.SymbolTableBuilder) *rw2.Request {
req := &rw2.Request{
Timeseries: []rw2.TimeSeries{
{
LabelsRefs: []uint32{syms.GetSymbol("__name__"), syms.GetSymbol("test_metric_total"), syms.GetSymbol("job"), syms.GetSymbol("test_job")},
Expand All @@ -75,69 +240,6 @@ func TestRW2Unmarshal(t *testing.T) {
},
},
}
writeRequest.Symbols = syms.GetSymbols()
data, err := writeRequest.Marshal()
require.NoError(t, err)

// Unmarshal the data back into Mimir's WriteRequest.
received := PreallocWriteRequest{}
received.UnmarshalFromRW2 = true
err = received.Unmarshal(data)
require.NoError(t, err)

expected := &PreallocWriteRequest{
WriteRequest: WriteRequest{
Timeseries: []PreallocTimeseries{
{
TimeSeries: &TimeSeries{
Labels: []LabelAdapter{
{
Name: "__name__",
Value: "test_metric_total",
},
{
Name: "job",
Value: "test_job",
},
},
Samples: []Sample{
{
Value: 123.456,
TimestampMs: 1234567890,
},
},
Exemplars: []Exemplar{
{
Value: 123.456,
TimestampMs: 1234567890,
Labels: []LabelAdapter{
{
Name: "__name__",
Value: "test_metric_total",
},
{
Name: "traceID",
Value: "1234567890abcdef",
},
},
},
},
},
},
},
Metadata: []*MetricMetadata{
{
MetricFamilyName: "test_metric_total",
Type: COUNTER,
Help: "test_metric_help",
Unit: "test_metric_unit",
},
},
unmarshalFromRW2: true,
},
UnmarshalFromRW2: true,
}

// Check that the unmarshalled data matches the original data.
require.Equal(t, expected, &received)
req.Symbols = syms.GetSymbols()
return req
}
5 changes: 5 additions & 0 deletions pkg/mimirpb/timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ type PreallocWriteRequest struct {

// UnmarshalRW2 is set to true if the Unmarshal method should unmarshal the data as a remote write 2.0 message.
UnmarshalFromRW2 bool

// RW2SymbolOffset is an optimization used for RW2-adjacent applications where typical symbol refs are shifted by an offset.
// This allows certain symbols to be reserved without being present in the symbols list.
RW2SymbolOffset uint32
}

// Unmarshal implements proto.Message.
Expand All @@ -75,6 +79,7 @@ func (p *PreallocWriteRequest) Unmarshal(dAtA []byte) error {
p.Timeseries = PreallocTimeseriesSliceFromPool()
p.skipUnmarshalingExemplars = p.SkipUnmarshalingExemplars
p.unmarshalFromRW2 = p.UnmarshalFromRW2
p.rw2symbols.offset = p.RW2SymbolOffset
return p.WriteRequest.Unmarshal(dAtA)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/ingest/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (c pusherConsumer) Consume(ctx context.Context, records []record) (returnEr
}

// We don't free the WriteRequest slices because they are being freed by a level below.
err := parsed.Unmarshal(r.content)
err := DeserializeRecordContent(r.content, parsed.PreallocWriteRequest, r.version)
if err != nil {
parsed.err = fmt.Errorf("parsing ingest consumer write request: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/ingest/pusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestPusherConsumer(t *testing.T) {
expectedWRs: writeReqs[0:2],
expErr: "",
expectedLogLines: []string{
"level=error msg=\"failed to parse write request; skipping\" err=\"received a record with an unsupported version: 101, max supported version: 1\"",
"level=error msg=\"failed to parse write request; skipping\" err=\"parsing ingest consumer write request: received a record with an unsupported version: 101, max supported version: 2\"",
},
},
"failed processing of record": {
Expand Down
27 changes: 26 additions & 1 deletion pkg/storage/ingest/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (

const (
RecordVersionHeaderKey = "Version"
LatestRecordVersion = 1
LatestRecordVersion = 2
V2RecordSymbolOffset = 64
)

func ValidateRecordVersion(version int) error {
Expand Down Expand Up @@ -84,3 +85,27 @@ func (v versionOneRecordSerializer) ToRecords(partitionID int32, tenantID string
}
return records, nil
}

func DeserializeRecordContent(content []byte, wr *mimirpb.PreallocWriteRequest, version int) error {
switch version {
case 0:
// V0 is body-compatible with V1.
fallthrough
case 1:
return deserializeRecordContentV1(content, wr)
case 2:
return deserializeRecordContentV2(content, wr)
default:
return fmt.Errorf("received a record with an unsupported version: %d, max supported version: %d", version, LatestRecordVersion)
}
}

func deserializeRecordContentV1(content []byte, wr *mimirpb.PreallocWriteRequest) error {
return wr.Unmarshal(content)
}

func deserializeRecordContentV2(content []byte, wr *mimirpb.PreallocWriteRequest) error {
wr.UnmarshalFromRW2 = true
wr.RW2SymbolOffset = V2RecordSymbolOffset
return wr.Unmarshal(content)
}
Loading