Skip to content

Commit 63a22b2

Browse files
authored
[chore] Bytes based batching for profiles (#12564)
#### Description This PR implements serialized bytes based batching for metrics. #### Link to tracking issue #3262 Continuation of #12299 made by @sfc-gh-sili. Related PRs: - #12550 - #12519 Signed-off-by: Israel Blancas <[email protected]>
1 parent 7d854d5 commit 63a22b2

File tree

7 files changed

+437
-68
lines changed

7 files changed

+437
-68
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add support for bytes-based batching for profiles in the exporterhelper package.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [3262]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: []
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package sizer // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
5+
6+
import (
7+
"go.opentelemetry.io/collector/pdata/pprofile"
8+
)
9+
10+
type ProfilesSizer interface {
11+
ProfilesSize(pd pprofile.Profiles) int
12+
ResourceProfilesSize(rp pprofile.ResourceProfiles) int
13+
ScopeProfilesSize(sp pprofile.ScopeProfiles) int
14+
ProfileSize(p pprofile.Profile) int
15+
DeltaSize(newItemSize int) int
16+
}
17+
18+
// TracesBytesSizer returns the byte size of serialized protos.
19+
type ProfilesBytesSizer struct {
20+
pprofile.ProtoMarshaler
21+
protoDeltaSizer
22+
}
23+
24+
var _ ProfilesSizer = (*ProfilesBytesSizer)(nil)
25+
26+
// ProfilesCountSizer returns the number of profiles in the profiles.
27+
type ProfilesCountSizer struct{}
28+
29+
var _ ProfilesSizer = (*ProfilesCountSizer)(nil)
30+
31+
func (s *ProfilesCountSizer) ProfilesSize(pd pprofile.Profiles) int {
32+
return pd.SampleCount()
33+
}
34+
35+
func (s *ProfilesCountSizer) ResourceProfilesSize(rp pprofile.ResourceProfiles) int {
36+
count := 0
37+
for k := 0; k < rp.ScopeProfiles().Len(); k++ {
38+
count += rp.ScopeProfiles().At(k).Profiles().Len()
39+
}
40+
return count
41+
}
42+
43+
func (s *ProfilesCountSizer) ScopeProfilesSize(sp pprofile.ScopeProfiles) int {
44+
return sp.Profiles().Len()
45+
}
46+
47+
func (s *ProfilesCountSizer) ProfileSize(_ pprofile.Profile) int {
48+
return 1
49+
}
50+
51+
func (s *ProfilesCountSizer) DeltaSize(newItemSize int) int {
52+
return newItemSize
53+
}

exporter/exporterhelper/xexporterhelper/profiles.go

+15-7
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"go.opentelemetry.io/collector/exporter/exporterhelper"
1818
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
1919
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
20+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
2021
"go.opentelemetry.io/collector/exporter/xexporter"
2122
"go.opentelemetry.io/collector/pdata/pprofile"
2223
"go.opentelemetry.io/collector/pipeline/xpipeline"
@@ -46,14 +47,14 @@ func NewProfilesQueueBatchSettings() exporterhelper.QueueBatchSettings {
4647
}
4748

4849
type profilesRequest struct {
49-
pd pprofile.Profiles
50-
cachedItemsCount int
50+
pd pprofile.Profiles
51+
cachedSize int
5152
}
5253

5354
func newProfilesRequest(pd pprofile.Profiles) exporterhelper.Request {
5455
return &profilesRequest{
55-
pd: pd,
56-
cachedItemsCount: pd.SampleCount(),
56+
pd: pd,
57+
cachedSize: -1,
5758
}
5859
}
5960

@@ -80,11 +81,18 @@ func (req *profilesRequest) OnError(err error) exporterhelper.Request {
8081
}
8182

8283
func (req *profilesRequest) ItemsCount() int {
83-
return req.cachedItemsCount
84+
return req.pd.SampleCount()
8485
}
8586

86-
func (req *profilesRequest) setCachedItemsCount(count int) {
87-
req.cachedItemsCount = count
87+
func (req *profilesRequest) size(sizer sizer.ProfilesSizer) int {
88+
if req.cachedSize == -1 {
89+
req.cachedSize = sizer.ProfilesSize(req.pd)
90+
}
91+
return req.cachedSize
92+
}
93+
94+
func (req *profilesRequest) setCachedSize(size int) {
95+
req.cachedSize = size
8896
}
8997

9098
type profileExporter struct {

exporter/exporterhelper/xexporterhelper/profiles_batch.go

+103-54
Original file line numberDiff line numberDiff line change
@@ -8,108 +8,157 @@ import (
88
"errors"
99

1010
"go.opentelemetry.io/collector/exporter/exporterhelper"
11+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
1112
"go.opentelemetry.io/collector/pdata/pprofile"
1213
)
1314

1415
// MergeSplit splits and/or merges the profiles into multiple requests based on the MaxSizeConfig.
15-
func (req *profilesRequest) MergeSplit(_ context.Context, maxSize int, _ exporterhelper.RequestSizerType, r2 exporterhelper.Request) ([]exporterhelper.Request, error) {
16+
func (req *profilesRequest) MergeSplit(_ context.Context, maxSize int, szt exporterhelper.RequestSizerType, r2 exporterhelper.Request) ([]exporterhelper.Request, error) {
17+
var sz sizer.ProfilesSizer
18+
switch szt {
19+
case exporterhelper.RequestSizerTypeItems:
20+
sz = &sizer.ProfilesCountSizer{}
21+
case exporterhelper.RequestSizerTypeBytes:
22+
sz = &sizer.ProfilesBytesSizer{}
23+
default:
24+
return nil, errors.New("unknown sizer type")
25+
}
26+
1627
if r2 != nil {
1728
req2, ok := r2.(*profilesRequest)
1829
if !ok {
1930
return nil, errors.New("invalid input type")
2031
}
21-
req2.mergeTo(req)
32+
req2.mergeTo(req, sz)
2233
}
2334

2435
// If no limit we can simply merge the new request into the current and return.
2536
if maxSize == 0 {
2637
return []exporterhelper.Request{req}, nil
2738
}
28-
return req.split(maxSize)
39+
return req.split(maxSize, sz), nil
2940
}
3041

31-
func (req *profilesRequest) mergeTo(dst *profilesRequest) {
32-
dst.setCachedItemsCount(dst.ItemsCount() + req.ItemsCount())
33-
req.setCachedItemsCount(0)
42+
func (req *profilesRequest) mergeTo(dst *profilesRequest, sz sizer.ProfilesSizer) {
43+
if sz != nil {
44+
dst.setCachedSize(dst.size(sz) + req.size(sz))
45+
req.setCachedSize(0)
46+
}
3447
req.pd.ResourceProfiles().MoveAndAppendTo(dst.pd.ResourceProfiles())
3548
}
3649

37-
func (req *profilesRequest) split(maxSize int) ([]exporterhelper.Request, error) {
50+
func (req *profilesRequest) split(maxSize int, sz sizer.ProfilesSizer) []exporterhelper.Request {
3851
var res []exporterhelper.Request
39-
for req.ItemsCount() > maxSize {
40-
pd := extractProfiles(req.pd, maxSize)
41-
size := pd.SampleCount()
42-
req.setCachedItemsCount(req.ItemsCount() - size)
43-
res = append(res, &profilesRequest{pd: pd, cachedItemsCount: size})
52+
for req.size(sz) > maxSize {
53+
pd, rmSize := extractProfiles(req.pd, maxSize, sz)
54+
req.setCachedSize(req.size(sz) - rmSize)
55+
res = append(res, newProfilesRequest(pd))
4456
}
4557
res = append(res, req)
46-
return res, nil
58+
return res
4759
}
4860

4961
// extractProfiles extracts a new profiles with a maximum number of samples.
50-
func extractProfiles(srcProfiles pprofile.Profiles, count int) pprofile.Profiles {
62+
func extractProfiles(srcProfiles pprofile.Profiles, capacity int, sz sizer.ProfilesSizer) (pprofile.Profiles, int) {
5163
destProfiles := pprofile.NewProfiles()
52-
srcProfiles.ResourceProfiles().RemoveIf(func(srcRS pprofile.ResourceProfiles) bool {
53-
if count == 0 {
64+
capacityLeft := capacity - sz.ProfilesSize(destProfiles)
65+
removedSize := 0
66+
srcProfiles.ResourceProfiles().RemoveIf(func(srcRP pprofile.ResourceProfiles) bool {
67+
// If the no more capacity left just return.
68+
if capacityLeft == 0 {
5469
return false
5570
}
56-
needToExtract := samplesCount(srcRS) > count
57-
if needToExtract {
58-
srcRS = extractResourceProfiles(srcRS, count)
71+
rawRpSize := sz.ResourceProfilesSize(srcRP)
72+
rpSize := sz.DeltaSize(rawRpSize)
73+
74+
if rpSize > capacityLeft {
75+
extSrcRP, extRpSize := extractResourceProfiles(srcRP, capacityLeft, sz)
76+
// This cannot make it to exactly 0 for the bytes,
77+
// force it to be 0 since that is the stopping condition.
78+
capacityLeft = 0
79+
removedSize += extRpSize
80+
// There represents the delta between the delta sizes.
81+
removedSize += rpSize - rawRpSize - (sz.DeltaSize(rawRpSize-extRpSize) - (rawRpSize - extRpSize))
82+
// It is possible that for the bytes scenario, the extracted field contains no profiles.
83+
// Do not add it to the destination if that is the case.
84+
if extSrcRP.ScopeProfiles().Len() > 0 {
85+
extSrcRP.MoveTo(destProfiles.ResourceProfiles().AppendEmpty())
86+
}
87+
return extSrcRP.ScopeProfiles().Len() != 0
5988
}
60-
count -= samplesCount(srcRS)
61-
srcRS.MoveTo(destProfiles.ResourceProfiles().AppendEmpty())
62-
return !needToExtract
89+
capacityLeft -= rpSize
90+
removedSize += rpSize
91+
srcRP.MoveTo(destProfiles.ResourceProfiles().AppendEmpty())
92+
return true
6393
})
64-
return destProfiles
94+
return destProfiles, removedSize
6595
}
6696

6797
// extractResourceProfiles extracts profiles and returns a new resource profiles with the specified number of profiles.
68-
func extractResourceProfiles(srcRS pprofile.ResourceProfiles, count int) pprofile.ResourceProfiles {
69-
destRS := pprofile.NewResourceProfiles()
70-
destRS.SetSchemaUrl(srcRS.SchemaUrl())
71-
srcRS.Resource().CopyTo(destRS.Resource())
72-
srcRS.ScopeProfiles().RemoveIf(func(srcSS pprofile.ScopeProfiles) bool {
73-
if count == 0 {
98+
func extractResourceProfiles(srcRP pprofile.ResourceProfiles, capacity int, sz sizer.ProfilesSizer) (pprofile.ResourceProfiles, int) {
99+
destRP := pprofile.NewResourceProfiles()
100+
destRP.SetSchemaUrl(srcRP.SchemaUrl())
101+
srcRP.Resource().CopyTo(destRP.Resource())
102+
// Take into account that this can have max "capacity", so when added to the parent will need space for the extra delta size.
103+
capacityLeft := capacity - (sz.DeltaSize(capacity) - capacity) - sz.ResourceProfilesSize(destRP)
104+
removedSize := 0
105+
106+
srcRP.ScopeProfiles().RemoveIf(func(srcSS pprofile.ScopeProfiles) bool {
107+
// If the no more capacity left just return.
108+
if capacityLeft == 0 {
74109
return false
75110
}
76-
needToExtract := srcSS.Profiles().Len() > count
77-
if needToExtract {
78-
srcSS = extractScopeProfiles(srcSS, count)
111+
112+
rawSlSize := sz.ScopeProfilesSize(srcSS)
113+
ssSize := sz.DeltaSize(rawSlSize)
114+
if ssSize > capacityLeft {
115+
extSrcSS, extSsSize := extractScopeProfiles(srcSS, capacityLeft, sz)
116+
// This cannot make it to exactly 0 for the bytes,
117+
// force it to be 0 since that is the stopping condition.
118+
capacityLeft = 0
119+
removedSize += extSsSize
120+
// There represents the delta between the delta sizes.
121+
removedSize += ssSize - rawSlSize - (sz.DeltaSize(rawSlSize-extSsSize) - (rawSlSize - extSsSize))
122+
// It is possible that for the bytes scenario, the extracted field contains no profiles.
123+
// Do not add it to the destination if that is the case.
124+
if extSrcSS.Profiles().Len() > 0 {
125+
extSrcSS.MoveTo(destRP.ScopeProfiles().AppendEmpty())
126+
}
127+
return extSrcSS.Profiles().Len() != 0
79128
}
80-
count -= srcSS.Profiles().Len()
81-
srcSS.MoveTo(destRS.ScopeProfiles().AppendEmpty())
82-
return !needToExtract
129+
capacityLeft -= ssSize
130+
removedSize += ssSize
131+
srcSS.MoveTo(destRP.ScopeProfiles().AppendEmpty())
132+
return true
83133
})
84-
srcRS.Resource().CopyTo(destRS.Resource())
85-
return destRS
134+
135+
return destRP, removedSize
86136
}
87137

88138
// extractScopeProfiles extracts profiles and returns a new scope profiles with the specified number of profiles.
89-
func extractScopeProfiles(srcSS pprofile.ScopeProfiles, count int) pprofile.ScopeProfiles {
139+
func extractScopeProfiles(srcSS pprofile.ScopeProfiles, capacity int, sz sizer.ProfilesSizer) (pprofile.ScopeProfiles, int) {
90140
destSS := pprofile.NewScopeProfiles()
91141
destSS.SetSchemaUrl(srcSS.SchemaUrl())
92142
srcSS.Scope().CopyTo(destSS.Scope())
143+
// Take into account that this can have max "capacity", so when added to the parent will need space for the extra delta size.
144+
capacityLeft := capacity - (sz.DeltaSize(capacity) - capacity) - sz.ScopeProfilesSize(destSS)
145+
removedSize := 0
93146
srcSS.Profiles().RemoveIf(func(srcProfile pprofile.Profile) bool {
94-
if count == 0 {
147+
// If the no more capacity left just return.
148+
if capacityLeft == 0 {
149+
return false
150+
}
151+
rsSize := sz.DeltaSize(sz.ProfileSize(srcProfile))
152+
if rsSize > capacityLeft {
153+
// This cannot make it to exactly 0 for the bytes,
154+
// force it to be 0 since that is the stopping condition.
155+
capacityLeft = 0
95156
return false
96157
}
158+
capacityLeft -= rsSize
159+
removedSize += rsSize
97160
srcProfile.MoveTo(destSS.Profiles().AppendEmpty())
98-
count--
99161
return true
100162
})
101-
return destSS
102-
}
103-
104-
// resourceProfilessCount calculates the total number of profiles in the pdata.ResourceProfiles.
105-
func samplesCount(rs pprofile.ResourceProfiles) int {
106-
count := 0
107-
rs.ScopeProfiles().RemoveIf(func(ss pprofile.ScopeProfiles) bool {
108-
ss.Profiles().RemoveIf(func(sp pprofile.Profile) bool {
109-
count += sp.Sample().Len()
110-
return false
111-
})
112-
return false
113-
})
114-
return count
163+
return destSS, removedSize
115164
}

0 commit comments

Comments
 (0)