Skip to content

Commit b5e1499

Browse files
pjanottiFiery-Fenix
authored andcommitted
[receiver/hostmetrics] Cheaper parent PID and number of threads retrieval on Windows (open-telemetry#38589)
(To replace PR open-telemetry#35337) Reduces the cost of getting number of threads and parent process ID by calling `CreateToolhelp32Snapshot` to do process enumeration. The data returned by this function already includes the number of threads and parent process ID - and avoiding calling `CreateToolhelp32Snapshot` for every enumerated process. Fix open-telemetry#32947 ```terminal > go test -benchmem -run=^$ -bench ^BenchmarkGetProcessMetadata$ -benchtime 10s goos: windows goarch: amd64 pkg: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/processscraper cpu: Intel(R) Core(TM) Ultra 7 165H BenchmarkGetProcessMetadata/Old-IncludeParentPid-22 3 3928594600 ns/op 28357514 B/op 13029 allocs/op BenchmarkGetProcessMetadata/New-IncludeParentPid-22 171 69861262 ns/op 28222817 B/op 12591 allocs/op BenchmarkGetProcessMetadata/Old-ExcludeParentPid-22 172 68011864 ns/op 28232550 B/op 12614 allocs/op BenchmarkGetProcessMetadata/New-ExcludeParentPid-22 169 72172583 ns/op 28351193 B/op 12647 allocs/op PASS ok github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/processscraper 81.722s > go test -benchmem -run=^$ -bench ^BenchmarkGetProcessMetadata$ -benchtime 30s goos: windows goarch: amd64 pkg: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/processscraper cpu: Intel(R) Core(TM) Ultra 7 165H BenchmarkGetProcessMetadata/Old-IncludeParentPid-22 8 4115364238 ns/op 28797698 B/op 13220 allocs/op BenchmarkGetProcessMetadata/New-IncludeParentPid-22 514 70165003 ns/op 28702086 B/op 12800 allocs/op BenchmarkGetProcessMetadata/Old-ExcludeParentPid-22 552 70230804 ns/op 28558975 B/op 12750 allocs/op BenchmarkGetProcessMetadata/New-ExcludeParentPid-22 504 73404366 ns/op 28507275 B/op 12707 allocs/op PASS ok github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/processscraper 170.587s ``` ![image](https://github.com/user-attachments/assets/95cb7f75-8ffb-43c0-ac9a-cce1c673d576)
1 parent a801b41 commit b5e1499

File tree

6 files changed

+253
-20
lines changed

6 files changed

+253
-20
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: 'enhancement'
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: hostmetricsreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: >
11+
Reduced the cost of retrieving number of threads and parent process ID on Windows.
12+
Disable the featuregate `hostmetrics.process.onWindowsUseNewGetProcesses` to fallback to the previous implementation.
13+
14+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
15+
issues: [32947, 38589]
16+
17+
# (Optional) One or more lines of additional information to render under the primary note.
18+
# These lines will be padded with 2 spaces and then inserted directly into the document.
19+
# Use pipe (|) for multiline entries.
20+
subtext:
21+
22+
# If your change doesn't affect end users or the exported elements of any package,
23+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
24+
# Optional: The change log or logs in which this entry should be included.
25+
# e.g. '[user]' or '[user, api]'
26+
# Include 'user' if the change is relevant to end users.
27+
# Include 'api' if there is a change to a library API.
28+
# Default: '[user]'
29+
change_logs: [user]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//go:build !windows
5+
6+
package processscraper // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/processscraper"
7+
8+
import (
9+
"context"
10+
11+
"github.com/shirou/gopsutil/v4/process"
12+
)
13+
14+
func getGopsutilProcessHandles(ctx context.Context) (processHandles, error) {
15+
processes, err := process.ProcessesWithContext(ctx)
16+
if err != nil {
17+
return nil, err
18+
}
19+
wrapped := make([]wrappedProcessHandle, len(processes))
20+
for i, p := range processes {
21+
wrapped[i] = wrappedProcessHandle{
22+
Process: p,
23+
}
24+
}
25+
26+
return &gopsProcessHandles{handles: wrapped}, nil
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//go:build windows
5+
6+
package processscraper // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/processscraper"
7+
8+
import (
9+
"context"
10+
"fmt"
11+
"unsafe"
12+
13+
"github.com/shirou/gopsutil/v4/process"
14+
"go.opentelemetry.io/collector/featuregate"
15+
"golang.org/x/sys/windows"
16+
)
17+
18+
var useNewGetProcessHandles = featuregate.GlobalRegistry().MustRegister(
19+
"hostmetrics.process.onWindowsUseNewGetProcesses",
20+
featuregate.StageBeta,
21+
featuregate.WithRegisterDescription("If disabled, the scraper will use the legacy implementation to retrieve process handles."),
22+
)
23+
24+
func getGopsutilProcessHandles(ctx context.Context) (processHandles, error) {
25+
if !useNewGetProcessHandles.IsEnabled() {
26+
return getGopsutilProcessHandlesLegacy(ctx)
27+
}
28+
29+
snap, err := windows.CreateToolhelp32Snapshot(windows.TH32CS_SNAPPROCESS, 0)
30+
if err != nil {
31+
return nil, fmt.Errorf("could not create snapshot: %w", err)
32+
}
33+
defer func() {
34+
_ = windows.CloseHandle(snap)
35+
}()
36+
37+
var pe32 windows.ProcessEntry32
38+
pe32.Size = uint32(unsafe.Sizeof(pe32))
39+
if err = windows.Process32First(snap, &pe32); err != nil {
40+
return nil, fmt.Errorf("could not get first process: %w", err)
41+
}
42+
43+
wrappedProcesses := make([]wrappedProcessHandle, 0, 64)
44+
for {
45+
select {
46+
case <-ctx.Done():
47+
return nil, ctx.Err()
48+
default:
49+
// Ignoring any errors here to keep same behavior as the legacy implementation
50+
// based on the `process.ProcessesWithContext` from the `gopsutil` package.
51+
p, _ := process.NewProcess(int32(pe32.ProcessID))
52+
if p != nil {
53+
wrappedProcess := wrappedProcessHandle{
54+
Process: p,
55+
parentPid: int32(pe32.ParentProcessID),
56+
initialNumThreads: int32(pe32.Threads),
57+
flags: flagParentPidSet | flagUseInitialNumThreadsOnce,
58+
}
59+
wrappedProcesses = append(wrappedProcesses, wrappedProcess)
60+
}
61+
}
62+
63+
if err = windows.Process32Next(snap, &pe32); err != nil {
64+
break
65+
}
66+
}
67+
68+
return &gopsProcessHandles{handles: wrappedProcesses}, nil
69+
}
70+
71+
func getGopsutilProcessHandlesLegacy(ctx context.Context) (processHandles, error) {
72+
processes, err := process.ProcessesWithContext(ctx)
73+
if err != nil {
74+
return nil, err
75+
}
76+
wrapped := make([]wrappedProcessHandle, len(processes))
77+
for i, p := range processes {
78+
wrapped[i] = wrappedProcessHandle{
79+
Process: p,
80+
}
81+
}
82+
83+
return &gopsProcessHandles{handles: wrapped}, nil
84+
}

receiver/hostmetricsreceiver/internal/scraper/processscraper/process.go

+40-15
Original file line numberDiff line numberDiff line change
@@ -109,18 +109,26 @@ func (p *gopsProcessHandles) Pid(index int) int32 {
109109
}
110110

111111
func (p *gopsProcessHandles) At(index int) processHandle {
112-
return p.handles[index]
112+
return &(p.handles[index])
113113
}
114114

115115
func (p *gopsProcessHandles) Len() int {
116116
return len(p.handles)
117117
}
118118

119+
const (
120+
flagParentPidSet = 1 << 0
121+
flagUseInitialNumThreadsOnce = 1 << 1
122+
)
123+
119124
type wrappedProcessHandle struct {
120125
*process.Process
126+
parentPid int32
127+
initialNumThreads int32
128+
flags uint8 // bitfield to track if fields are set
121129
}
122130

123-
func (p wrappedProcessHandle) CgroupWithContext(ctx context.Context) (string, error) {
131+
func (p *wrappedProcessHandle) CgroupWithContext(ctx context.Context) (string, error) {
124132
pid := p.Process.Pid
125133
statPath := getEnvWithContext(ctx, string(common.HostProcEnvKey), "/proc", strconv.Itoa(int(pid)), "cgroup")
126134
contents, err := os.ReadFile(statPath)
@@ -131,6 +139,36 @@ func (p wrappedProcessHandle) CgroupWithContext(ctx context.Context) (string, er
131139
return strings.TrimSuffix(string(contents), "\n"), nil
132140
}
133141

142+
func (p *wrappedProcessHandle) PpidWithContext(ctx context.Context) (int32, error) {
143+
if p.flags&flagParentPidSet != 0 {
144+
return p.parentPid, nil
145+
}
146+
147+
parentPid, err := p.Process.PpidWithContext(ctx)
148+
if err != nil {
149+
return 0, err
150+
}
151+
152+
p.parentPid = parentPid
153+
p.flags |= flagParentPidSet
154+
return parentPid, nil
155+
}
156+
157+
func (p *wrappedProcessHandle) NumThreadsWithContext(ctx context.Context) (int32, error) {
158+
if p.flags&flagUseInitialNumThreadsOnce != 0 {
159+
// The number of threads can fluctuate so use the initially cached value only the first time.
160+
p.flags &^= flagUseInitialNumThreadsOnce
161+
return p.initialNumThreads, nil
162+
}
163+
164+
numThreads, err := p.Process.NumThreadsWithContext(ctx)
165+
if err != nil {
166+
return 0, err
167+
}
168+
169+
return numThreads, nil
170+
}
171+
134172
// copied from gopsutil:
135173
// GetEnvWithContext retrieves the environment variable key. If it does not exist it returns the default.
136174
// The context may optionally contain a map superseding os.EnvKey.
@@ -150,19 +188,6 @@ func getEnvWithContext(ctx context.Context, key string, dfault string, combineWi
150188
return filepath.Join(segments...)
151189
}
152190

153-
func getProcessHandlesInternal(ctx context.Context) (processHandles, error) {
154-
processes, err := process.ProcessesWithContext(ctx)
155-
if err != nil {
156-
return nil, err
157-
}
158-
wrapped := make([]wrappedProcessHandle, len(processes))
159-
for i, p := range processes {
160-
wrapped[i] = wrappedProcessHandle{Process: p}
161-
}
162-
163-
return &gopsProcessHandles{handles: wrapped}, nil
164-
}
165-
166191
func parentPid(ctx context.Context, handle processHandle, pid int32) (int32, error) {
167192
// special case for pid 0 and pid 1 in darwin
168193
if pid == 0 || (pid == 1 && runtime.GOOS == "darwin") {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//go:build windows
5+
6+
package processscraper
7+
8+
import (
9+
"context"
10+
"testing"
11+
12+
"github.com/stretchr/testify/require"
13+
"go.opentelemetry.io/collector/featuregate"
14+
"go.opentelemetry.io/collector/scraper"
15+
)
16+
17+
func BenchmarkGetProcessMetadata(b *testing.B) {
18+
ctx := context.Background()
19+
config := &Config{}
20+
21+
scraper, err := newProcessScraper(scraper.Settings{}, config)
22+
if err != nil {
23+
b.Fatalf("Failed to create process scraper: %v", err)
24+
}
25+
26+
benchmarks := []struct {
27+
name string
28+
useLegacy bool
29+
parentPidEnabled bool
30+
}{
31+
{
32+
name: "New-ExcludeParentPid",
33+
},
34+
{
35+
name: "Old-ExcludeParentPid",
36+
useLegacy: true,
37+
},
38+
{
39+
name: "New-IncludeParentPid",
40+
parentPidEnabled: true,
41+
},
42+
{
43+
name: "Old-IncludeParentPid",
44+
parentPidEnabled: true,
45+
useLegacy: true,
46+
},
47+
}
48+
49+
for _, bm := range benchmarks {
50+
b.Run(bm.name, func(b *testing.B) {
51+
// Set feature gate value
52+
previousValue := useNewGetProcessHandles.IsEnabled()
53+
require.NoError(b, featuregate.GlobalRegistry().Set(useNewGetProcessHandles.ID(), !bm.useLegacy))
54+
defer func() {
55+
require.NoError(b, featuregate.GlobalRegistry().Set(useNewGetProcessHandles.ID(), previousValue))
56+
}()
57+
scraper.config.MetricsBuilderConfig.ResourceAttributes.ProcessParentPid.Enabled = bm.parentPidEnabled
58+
59+
for i := 0; i < b.N; i++ {
60+
// Typically there are errors, but we are not interested in them for this benchmark
61+
_, _ = scraper.getProcessMetadata(ctx)
62+
}
63+
})
64+
}
65+
}

receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper.go

+8-5
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func newProcessScraper(settings scraper.Settings, cfg *Config) (*processScraper,
6666
settings: settings,
6767
config: cfg,
6868
getProcessCreateTime: processHandle.CreateTimeWithContext,
69-
getProcessHandles: getProcessHandlesInternal,
69+
getProcessHandles: getGopsutilProcessHandles,
7070
scrapeProcessDelay: cfg.ScrapeProcessDelay,
7171
ucals: make(map[int32]*ucal.CPUUtilizationCalculator),
7272
handleCountManager: handlecount.NewManager(),
@@ -271,14 +271,17 @@ func (s *processScraper) getProcessMetadata(ctx context.Context) ([]*processMeta
271271
continue
272272
}
273273

274-
parentPid, err := parentPid(ctx, handle, pid)
275-
if err != nil {
276-
errs.AddPartial(0, fmt.Errorf("error reading parent pid for process %q (pid %v): %w", executable.name, pid, err))
274+
parentProcessID := int32(0)
275+
if s.config.MetricsBuilderConfig.ResourceAttributes.ProcessParentPid.Enabled {
276+
parentProcessID, err = parentPid(ctx, handle, pid)
277+
if err != nil {
278+
errs.AddPartial(0, fmt.Errorf("error reading parent pid for process %q (pid %v): %w", executable.name, pid, err))
279+
}
277280
}
278281

279282
md := &processMetadata{
280283
pid: pid,
281-
parentPid: parentPid,
284+
parentPid: parentProcessID,
282285
executable: executable,
283286
command: command,
284287
username: username,

0 commit comments

Comments
 (0)