Skip to content

make GCP provider work concurrently #3152

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open

make GCP provider work concurrently #3152

wants to merge 15 commits into from

Conversation

orouz
Copy link
Collaborator

@orouz orouz commented Mar 30, 2025

Summary of your changes

the core of this PR is to make any type of work done in the gcplib provider to run concurrently. this means that there's a pipeline of fetching, merging and enriching and once it's done the assets are sent to the fetcher which in turn sends them to the resourceCh of the flavor pipeline (cspm/assets inventory)

specific and notable changes are mentioned in PR comments.

Screenshot/Data

CSPM GCP: same total findings count (1149) and same findings count per rule (with same passed/failed count) as in live env (elastic-security-test)

rule.benchmark.rule_number Count of records
3.8 826
1.7 65
2.12 19
3.1 19
3.2 19
4.7 15
4.1 14
4.2 14
4.3 14
4.4 14
4.5 14
4.6 14
4.8 14
4.9 14
1.4 9
3.6 9
3.7 9
1.12 4
1.14 4
1.15 4
1.10 3
1.9 3
5.1 3
5.2 3
2.3 2
1.11 1
1.17 1
1.5 1
1.6 1
1.8 1
2.10 1
2.11 1
2.13 1
2.16 1
2.4 1
2.5 1
2.6 1
2.7 1
2.8 1
2.9 1
3.3 1
3.4 1
3.5 1
7.1 1
7.2 1
7.3 1

Assets Inventory GCP: 19 more assets (due to network assets addition which is on `8.x`/`main` and not `8.17`) as in live env (elastic-security-test)

cloud.service.name Count of records
compute.googleapis.com/Subnetwork 826
iam.googleapis.com/ServiceAccountKey 65
iam.googleapis.com/ServiceAccount 44
compute.googleapis.com/Network 19
compute.googleapis.com/Instance 14
compute.googleapis.com/Firewall 9
iam.googleapis.com/Role 3
storage.googleapis.com/Bucket 3
cloudresourcemanager.googleapis.com/Project 1
run.googleapis.com/Service 1

  • also ran these changes in CSPM GCP organization-account mode for 7 conseqeutive runs (5 mins period) with same number of assets in each cycle

Related Issues

Copy link

mergify bot commented Mar 30, 2025

This pull request is now in conflicts. Could you fix it? 🙏
To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b gcp_ch upstream/gcp_ch
git merge upstream/main
git push upstream gcp_ch

@mergify mergify bot assigned orouz Mar 30, 2025
Copy link

mergify bot commented Mar 30, 2025

This pull request does not have a backport label. Could you fix it @orouz? 🙏
To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-v./d./d./d is the label to automatically backport to the 8./d branch. /d is the digit
  • backport-active-all is the label that automatically backports to all active branches.
  • backport-active-8 is the label that automatically backports to all active minor branches for the 8 major.
  • backport-active-9 is the label that automatically backports to all active minor branches for the 9 major.
    NOTE: backport-v8.x has been added to help with the transition to the new branch 8.x.

@orouz orouz force-pushed the gcp_ch branch 3 times, most recently from 1d5beb8 to b0b857e Compare March 30, 2025 16:04
Copy link

mergify bot commented Apr 1, 2025

This pull request is now in conflicts. Could you fix it? 🙏
To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b gcp_ch upstream/gcp_ch
git merge upstream/main
git push upstream gcp_ch

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a new fetcher for network assets, it was added to avoid an enrichment attempt we did on all assets instead of just network assets:

p.enrichNetworkAssets(ctx, extendedAssets)

these assets are now fetched and enriched in their own fetcher, avoiding said attempt when fetching all other assets in assets_fetcher

Comment on lines -108 to -111
ListLoggingAssets(ctx context.Context) ([]*LoggingAsset, error)

// ListServiceUsageAssets returns a list of service usage assets grouped by project id
ListServiceUsageAssets(ctx context.Context) ([]*ServiceUsageAsset, error)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these two were combined in ListProjectAssets

Comment on lines 134 to 143
resourceCh := make(chan *assetpb.Asset) // *assetpb.Asset with Resource
policyCh := make(chan *assetpb.Asset) // *assetpb.Asset with IamPolicy
mergeCh := make(chan *assetpb.Asset) // *assetpb.Asset with Resource and IamPolicy
enrichCh := make(chan *ExtendedGcpAsset)

var assets []*assetpb.Asset
assets = append(append(assets, resourceAssets...), policyAssets...)
mergedAssets := mergeAssetContentType(assets)
extendedAssets := p.extendWithCloudMetadata(ctx, mergedAssets)
// Enrich network assets with dns policy
p.enrichNetworkAssets(ctx, extendedAssets)
go p.getAllAssets(ctx, p.config.Parent, assetpb.ContentType_RESOURCE, assetTypes, resourceCh)
go p.getAllAssets(ctx, p.config.Parent, assetpb.ContentType_IAM_POLICY, assetTypes, policyCh)
go p.mergeAssets(ctx, mergeCh, resourceCh, policyCh)
go p.enrichAssets(ctx, mergeCh, enrichCh)

return extendedAssets, nil
for asset := range enrichCh {
out <- asset
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is used by the main fetcher (assets_fetcher) and the way it works is that we start fetching resources and then merge them by name. the merging process sends assets to the enrichment channel if: 1) an asset has a resource and a policy, or 2) an asset has either a resource or a policy, and the other channel is closed, so we don't need to wait for merging with the other content type

after an asset is enriched it's sent to the out channel which is received in the fetcher and passed to the resourceCh to start the relevant flavor (cspm/assets inventory) pipeline

logMetrics, err := p.ListAllAssetTypesByName(ctx, monitoringAssetTypes["LogMetric"])
if err != nil {
return nil, err
func (p *Provider) ListMonitoringAssets(ctx context.Context, out chan<- *MonitoringAsset) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method used to fetch all resources and policies for 2 asset types, then group them by project, and once all assets are grouped, return a slice with all groups

it now does the same thing, but sends each group once it's ready, instead of waiting for all of them to be ready. also, only resources are fetched (no policies, as CIS rules don't require it. not all these asset types necessarily have a policy, but trying to fetch them was redundant)

if err != nil {
return nil, err
}
func (p *Provider) ListProjectsAncestorsPolicies(ctx context.Context, out chan<- *ProjectPoliciesAsset) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this used to send a slice of items that each represent a group of assets of the same project. it now does the same, but sends each group separately so we don't wait for all of them.

var assets []*ExtendedGcpAsset
assets = append(append(assets, logMetrics...), alertPolicies...)
monitoringAssets := getAssetsByProject[MonitoringAsset](assets, p.log, typeGenerator)
func (p *Provider) ListProjectAssets(ctx context.Context, assetTypes []string, out chan<- *ProjectAssets) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function now handles the same thing ListLoggingAssets and ListServiceUsageAssets used to. they both used to fetch both resource and policy for some asset types, group them all by project and send a slice with each group.

it now does the same thing, but starts by fetching projects and then for each project fetches only the resource of the given asset types and the sends it out. the difference is that we don't wait for all assets from all projects, plus we don't fetch policies as they weren't used by the relevant CIS rules.

@oren-zohar oren-zohar requested review from Copilot and moukoublen April 3, 2025 11:37
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR enhances the GCP provider to run its workflows concurrently by refactoring asset fetchers to use channels and goroutines. Key changes include:

  • Refactoring of multiple fetchers (Service Usage, Policies, Monitoring, Networks, Log Sink, and Assets) to use concurrent channel-based asset delivery.
  • Renaming and updating mock functions to support the new asynchronous calls.
  • Updating tests to reflect the new concurrent behavior and added network asset support.

Reviewed Changes

Copilot reviewed 25 out of 25 changed files in this pull request and generated no comments.

File Description
internal/resources/providers/gcplib/inventory/mock_service_api.go Added Clear function and renamed/mock helper functions to support new ListAssetTypes method.
internal/resources/fetching/preset/gcp_preset.go Added initialization for the network assets fetcher.
internal/resources/fetching/fetchers/gcp/* Modified fetchers (Service Usage, Policies, Monitoring, Networks, Log Sink, Assets) to run concurrently via channels.
internal/inventory/gcpfetcher/* Updated asset fetcher and mock provider functions to replace ListAllAssetTypesByName with ListAssetTypes, and adjusted tests accordingly.
Comments suppressed due to low confidence (2)

internal/resources/fetching/fetchers/gcp/service_usage_fetcher.go:38

  • [nitpick] Consider standardizing the naming of the asset subtype field across fetchers. In some files the field is named 'subType' (e.g. here) while in others it is 'SubType'; standardizing this will improve code consistency.
subType string

internal/resources/fetchers/fetchers/gcp/assets_fetcher.go:86

  • Ensure that the implementation of ListAssetTypes in the provider always closes the results channel; otherwise, the for-select loop may hang if the channel is never closed.
go f.provider.ListAssetTypes(ctx, lo.Keys(reversedGcpAssetTypes), resultsCh)

Init(ctx context.Context, log *clog.Logger, gcpConfig auth.GcpFactoryConfig) (ServiceAPI, error)
}

func (p *ProviderInitializer) Init(ctx context.Context, log *clog.Logger, gcpConfig auth.GcpFactoryConfig) (ServiceAPI, error) {
func newAssetsInventoryWrapper(ctx context.Context, log *clog.Logger, gcpConfig auth.GcpFactoryConfig) (*AssetsInventoryWrapper, error) {
limiter := NewAssetsInventoryRateLimiter(log)
Copy link
Collaborator Author

@orouz orouz Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we use rate limiting for the GCP Assets Inventory ListAssets method, and because we're making more concurrent calls, the cycle fetching takes longer (~30seconds vs ~18secs for our gcp test account) as we wait between calls.

@orouz orouz marked this pull request as ready for review April 6, 2025 11:58
@orouz orouz requested a review from a team as a code owner April 6, 2025 11:58
f.log.Infof("GcpAssetsFetcher.Fetch context err: %s", ctx.Err().Error())
f.log.Info("GcpAssetsFetcher.Fetch start")
defer f.log.Info("GcpAssetsFetcher.Fetch done")
defer f.provider.Clear()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clear() will clear out the cache used for cloud account metadata, which is accessed by all assets fetched (mostly all hits)

it used to not get cleared at all (see #2182)
now it gets cleared when each fetcher exits, which is better but also not good, as it should get called when all fetchers are done. i think a better place would be if the registry would be the one calling a Clear method on the Fetcher interface, allowing them to clear up caches when fetching cycle is done.

@orouz orouz force-pushed the gcp_ch branch 2 times, most recently from b67b8de to 031d2c7 Compare April 16, 2025 09:53
@orouz orouz linked an issue Apr 21, 2025 that may be closed by this pull request
}

func getOrganizationId(ancestors []string) string {
last := ancestors[len(ancestors)-1]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: perhaps a check if the ancestors has zero length (is it possible?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

CSPM GCP RAM footprint
2 participants