Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using new otlpfile pipeline for self metrics #1922

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
10 changes: 2 additions & 8 deletions cmd/google_cloud_ops_agent_diagnostics/main_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,19 @@ import (
"syscall"

"github.com/GoogleCloudPlatform/ops-agent/cmd/google_cloud_ops_agent_diagnostics/utils"
"github.com/GoogleCloudPlatform/ops-agent/internal/self_metrics"
)

var (
config = flag.String("config", "/etc/google-cloud-ops-agent/config.yaml", "path to the user specified agent config")
)

func run(ctx context.Context) error {
userUc, mergedUc, err := utils.GetUserAndMergedConfigs(ctx, *config)
_, _, err := utils.GetUserAndMergedConfigs(ctx, *config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method can just return an err now. We don't ever use the first 2 return values I don't think

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This all will go away on the next PR that removes the diagnostics binary. I'm trying to change the least i can and maintain functionality to make it easier to review.

if err != nil {
return err
}

ctx, cancel := context.WithCancel(ctx)
_, cancel := context.WithCancel(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this anymore then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This all will go away on the next PR that removes the diagnostics binary. I'm trying to change the least i can and maintain functionality to make it easier to review.

defer cancel()

go func() {
Expand All @@ -54,10 +53,5 @@ func run(ctx context.Context) error {
}
}()

err = self_metrics.CollectOpsAgentSelfMetrics(ctx, userUc, mergedUc)
if err != nil {
return err
}

return nil
}
9 changes: 1 addition & 8 deletions cmd/google_cloud_ops_agent_diagnostics/main_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"os"

"github.com/GoogleCloudPlatform/ops-agent/cmd/google_cloud_ops_agent_diagnostics/utils"
"github.com/GoogleCloudPlatform/ops-agent/internal/self_metrics"
"go.opentelemetry.io/otel"
"golang.org/x/sys/windows/svc"
"golang.org/x/sys/windows/svc/debug"
Expand Down Expand Up @@ -81,7 +80,7 @@ func (s *service) Execute(args []string, r <-chan svc.ChangeRequest, changes cha
return false, ERROR_INVALID_PARAMETER
}

userUc, mergedUc, err := utils.GetUserAndMergedConfigs(ctx, s.userConf)
_, _, err := utils.GetUserAndMergedConfigs(ctx, s.userConf)
if err != nil {
s.log.Error(DiagnosticsEventID, fmt.Sprintf("failed to obtain unified configuration: %v", err))
return false, ERROR_FILE_NOT_FOUND
Expand Down Expand Up @@ -113,12 +112,6 @@ func (s *service) Execute(args []string, r <-chan svc.ChangeRequest, changes cha
// Set otel error handler
otel.SetErrorHandler(s)

err = self_metrics.CollectOpsAgentSelfMetrics(ctx, userUc, mergedUc)
if err != nil {
s.log.Error(DiagnosticsEventID, fmt.Sprintf("failed to collect ops agent self metrics: %v", err))
return false, ERROR_INVALID_DATA
}

return false, ERROR_SUCCESS
}

Expand Down
7 changes: 0 additions & 7 deletions cmd/ops_agent_uap_plugin/service_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,6 @@ func runSubagents(ctx context.Context, cancel context.CancelFunc, pluginInstallD
})

var wg sync.WaitGroup
// Starting the diagnostics service
runDiagnosticsCmd := exec.CommandContext(ctx,
path.Join(pluginInstallDirectory, DiagnosticsBinary),
"-config", OpsAgentConfigLocationLinux,
)
wg.Add(1)
go runSubAgentCommand(ctx, cancel, runDiagnosticsCmd, runCommand, &wg)

// Starting Otel
runOtelCmd := exec.CommandContext(ctx,
Expand Down
33 changes: 2 additions & 31 deletions cmd/ops_agent_uap_plugin/service_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@ import (
"unsafe"

"github.com/GoogleCloudPlatform/ops-agent/apps"
"github.com/GoogleCloudPlatform/ops-agent/cmd/google_cloud_ops_agent_diagnostics/utils"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator"
"github.com/GoogleCloudPlatform/ops-agent/internal/healthchecks"
"github.com/GoogleCloudPlatform/ops-agent/internal/logs"
"github.com/GoogleCloudPlatform/ops-agent/internal/self_metrics"
"github.com/kardianos/osext"
"go.opentelemetry.io/otel"
"golang.org/x/sys/windows"
"golang.org/x/sys/windows/svc/debug"
"golang.org/x/sys/windows/svc/eventlog"
Expand Down Expand Up @@ -168,8 +166,7 @@ func (ps *OpsAgentPluginServer) Start(ctx context.Context, msg *pb.StartRequest)
windowsEventLogger.Close()
}

otelErrorHandler := &otelErrorHandler{windowsEventLogger: windowsEventLogger, windowsEventId: OpsAgentUAPPluginEventID}
go runSubagents(pContext, cancelFunc, pluginInstallDir, pluginStateDir, runSubAgentCommand, ps.runCommand, otelErrorHandler)
go runSubagents(pContext, cancelFunc, pluginInstallDir, pluginStateDir, runSubAgentCommand, ps.runCommand)

return &pb.StartResponse{}, nil
}
Expand Down Expand Up @@ -386,14 +383,9 @@ func createWindowsJobHandle() (windows.Handle, error) {
//
// cancel: the cancel function for the parent context. By calling this function, the parent context is canceled,
// and GetStatus() returns a non-healthy status, signaling UAP to re-trigger Start().
//
// otelErrorHandler: an implementation of otel.ErrorHandler that is used in the diagnostics service to log otel errors to the Windows event log.
func runSubagents(ctx context.Context, cancel context.CancelFunc, pluginInstallDirectory string, pluginStateDirectory string, runSubAgentCommand RunSubAgentCommandFunc, runCommand RunCommandFunc, otelErrorHandler otel.ErrorHandler) {
func runSubagents(ctx context.Context, cancel context.CancelFunc, pluginInstallDirectory string, pluginStateDirectory string, runSubAgentCommand RunSubAgentCommandFunc, runCommand RunCommandFunc) {

var wg sync.WaitGroup
// Starting the diagnostics service
wg.Add(1)
go runDiagnosticsService(ctx, cancel, otelErrorHandler, &wg)

// Starting Otel
runOtelCmd := exec.CommandContext(ctx,
Expand All @@ -419,27 +411,6 @@ func runSubagents(ctx context.Context, cancel context.CancelFunc, pluginInstallD
wg.Wait()
}

func runDiagnosticsService(ctx context.Context, cancel context.CancelFunc, otelErrorHandler otel.ErrorHandler, wg *sync.WaitGroup) {
defer wg.Done()

userUc, mergedUc, err := utils.GetUserAndMergedConfigs(ctx, OpsAgentConfigLocationWindows)
if err != nil {
log.Printf("Failed to run the diagnostics service: %v", err)
cancel()
return
}

// Set otel error handler
otel.SetErrorHandler(otelErrorHandler)

err = self_metrics.CollectOpsAgentSelfMetrics(ctx, userUc, mergedUc)
if err != nil {
log.Printf("Failed to run the diagnostics service: %v", err)
cancel()
return
}
}

func runCommand(cmd *exec.Cmd) (string, error) {
if cmd == nil {
return "", nil
Expand Down
8 changes: 0 additions & 8 deletions cmd/ops_agent_windows/main_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,6 @@ func initServices() error {
"--storage_path", fluentbitStoragePath,
},
},
{
fmt.Sprintf("%s-diagnostics", serviceName),
fmt.Sprintf("%s - Diagnostics", serviceDisplayName),
filepath.Join(base, fmt.Sprintf("%s-diagnostics.exe", serviceName)),
[]string{
"-config", filepath.Join(base, "../config/config.yaml"),
},
},
}
return nil
}
27 changes: 27 additions & 0 deletions confgenerator/agentmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
package confgenerator

import (
"context"
"fmt"
"path/filepath"
"time"

"github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel/ottl"
Expand Down Expand Up @@ -166,4 +169,28 @@ func (r AgentSelfMetrics) LoggingSubmodulePipeline() otel.ReceiverPipeline {
}
}

func OpsAgentSelfMetricsPipeline(ctx context.Context, outDir string) otel.ReceiverPipeline {
receiver_config := map[string]any{
"include": []string{
filepath.Join(outDir, "enabled_receivers_otlp.json"),
filepath.Join(outDir, "feature_tracking_otlp.json")},
"replay_file": true,
"poll_interval": time.Duration(60 * time.Second).String(),
}
return otel.ReceiverPipeline{
Receiver: otel.Component{
Type: "otlpjsonfile",
Config: receiver_config,
},
ExporterTypes: map[string]otel.ExporterType{
"metrics": otel.System,
},
Processors: map[string][]otel.Component{
"metrics": {
otel.Transform("metric", "datapoint", []ottl.Statement{"set(time, Now())"}),
},
},
}
}

// intentionally not registered as a component because this is not created by users
8 changes: 7 additions & 1 deletion confgenerator/confgenerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (uc *UnifiedConfig) getOTelLogLevel() string {
return logLevel
}

func (uc *UnifiedConfig) GenerateOtelConfig(ctx context.Context) (string, error) {
func (uc *UnifiedConfig) GenerateOtelConfig(ctx context.Context, outDir string) (string, error) {
p := platform.FromContext(ctx)
userAgent, _ := p.UserAgent("Google-Cloud-Ops-Agent-Metrics")
metricVersionLabel, _ := p.VersionLabel("google-cloud-ops-agent-metrics")
Expand All @@ -100,6 +100,12 @@ func (uc *UnifiedConfig) GenerateOtelConfig(ctx context.Context) (string, error)
ReceiverPipelineName: "otel",
}

receiverPipelines["ops_agent"] = OpsAgentSelfMetricsPipeline(ctx, outDir)
pipelines["ops_agent"] = otel.Pipeline{
Type: "metrics",
ReceiverPipelineName: "ops_agent",
}

receiverPipelines["fluentbit"] = AgentSelfMetrics{
Version: loggingVersionLabel,
Port: fluentbit.MetricsPort,
Expand Down
2 changes: 1 addition & 1 deletion confgenerator/confgenerator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func generateConfigs(pc platformConfig, testDir string) (got map[string]string,
}

// Otel configs
otelGeneratedConfig, err := mergedUc.GenerateOtelConfig(ctx)
otelGeneratedConfig, err := mergedUc.GenerateOtelConfig(ctx, "")
if err != nil {
return
}
Expand Down
2 changes: 1 addition & 1 deletion confgenerator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1154,7 +1154,7 @@ func (uc *UnifiedConfig) OTelLoggingSupported(ctx context.Context) bool {
ucLoggingCopy.Logging.Service = &LoggingService{}
}
ucLoggingCopy.Logging.Service.OTelLogging = true
_, err = ucLoggingCopy.GenerateOtelConfig(ctx)
_, err = ucLoggingCopy.GenerateOtelConfig(ctx, "")
return err == nil
}

Expand Down
2 changes: 1 addition & 1 deletion confgenerator/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (uc *UnifiedConfig) GenerateFilesFromConfig(ctx context.Context, service, l
}
}
case "otel":
otelConfig, err := uc.GenerateOtelConfig(ctx)
otelConfig, err := uc.GenerateOtelConfig(ctx, outDir)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] In the fluent bit case, the Generate Command just returns the files and then we write in this function. For OTel though, it looks like we write the file in side this call. Lets make the behaviour consistent between the two?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can do on a follow up

Copy link
Contributor

@franciscovalentecastro franciscovalentecastro Apr 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we don't write the otel generated configs inside GenerateOtelConfig, they are written in here the same way as the fluent-bit case (see the next 10 lines).

In the PR, @rafaelwestphal is passing outDir to know where did the *_oltp.json files were written too.

if err != nil {
return fmt.Errorf("can't parse configuration: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,12 @@ processors:
resourcedetection/_global_0:
detectors:
- gcp
transform/ops_agent_0:
error_mode: ignore
metric_statements:
- context: datapoint
statements:
- set(time, Now())
transform/otel_1:
error_mode: ignore
metric_statements:
Expand All @@ -471,6 +477,12 @@ receivers:
processes: {}
nvml/hostmetrics_1:
collection_interval: 60s
otlpjsonfile/ops_agent:
include:
- enabled_receivers_otlp.json
- feature_tracking_otlp.json
poll_interval: 1m0s
replay_file: true
prometheus/fluentbit:
config:
scrape_configs:
Expand Down Expand Up @@ -517,6 +529,14 @@ service:
- resourcedetection/_global_0
receivers:
- prometheus/fluentbit
metrics/ops_agent:
exporters:
- googlecloud
processors:
- transform/ops_agent_0
- resourcedetection/_global_0
receivers:
- otlpjsonfile/ops_agent
metrics/otel:
exporters:
- googlecloud
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,12 @@ processors:
resourcedetection/_global_0:
detectors:
- gcp
transform/ops_agent_0:
error_mode: ignore
metric_statements:
- context: datapoint
statements:
- set(time, Now())
transform/otel_1:
error_mode: ignore
metric_statements:
Expand All @@ -445,6 +451,12 @@ receivers:
mute_process_exe_error: true
mute_process_name_error: true
processes: {}
otlpjsonfile/ops_agent:
include:
- enabled_receivers_otlp.json
- feature_tracking_otlp.json
poll_interval: 1m0s
replay_file: true
prometheus/fluentbit:
config:
scrape_configs:
Expand Down Expand Up @@ -483,6 +495,14 @@ service:
- resourcedetection/_global_0
receivers:
- prometheus/fluentbit
metrics/ops_agent:
exporters:
- googlecloud
processors:
- transform/ops_agent_0
- resourcedetection/_global_0
receivers:
- otlpjsonfile/ops_agent
metrics/otel:
exporters:
- googlecloud
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,12 @@ processors:
resourcedetection/_global_0:
detectors:
- gcp
transform/ops_agent_0:
error_mode: ignore
metric_statements:
- context: datapoint
statements:
- set(time, Now())
transform/otel_1:
error_mode: ignore
metric_statements:
Expand All @@ -455,6 +461,12 @@ receivers:
mute_process_exe_error: true
mute_process_name_error: true
processes: {}
otlpjsonfile/ops_agent:
include:
- enabled_receivers_otlp.json
- feature_tracking_otlp.json
poll_interval: 1m0s
replay_file: true
prometheus/fluentbit:
config:
scrape_configs:
Expand Down Expand Up @@ -493,6 +505,14 @@ service:
- resourcedetection/_global_0
receivers:
- prometheus/fluentbit
metrics/ops_agent:
exporters:
- googlecloud
processors:
- transform/ops_agent_0
- resourcedetection/_global_0
receivers:
- otlpjsonfile/ops_agent
metrics/otel:
exporters:
- googlecloud
Expand Down
Loading