-
Notifications
You must be signed in to change notification settings - Fork 43
make GCP provider work concurrently #3152
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This pull request is now in conflicts. Could you fix it? 🙏
|
This pull request does not have a backport label. Could you fix it @orouz? 🙏
|
1d5beb8
to
b0b857e
Compare
This pull request is now in conflicts. Could you fix it? 🙏
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a new fetcher for network assets, it was added to avoid an enrichment attempt we did on all assets instead of just network assets:
p.enrichNetworkAssets(ctx, extendedAssets) |
these assets are now fetched and enriched in their own fetcher, avoiding said attempt when fetching all other assets in assets_fetcher
ListLoggingAssets(ctx context.Context) ([]*LoggingAsset, error) | ||
|
||
// ListServiceUsageAssets returns a list of service usage assets grouped by project id | ||
ListServiceUsageAssets(ctx context.Context) ([]*ServiceUsageAsset, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these two were combined in ListProjectAssets
resourceCh := make(chan *assetpb.Asset) // *assetpb.Asset with Resource | ||
policyCh := make(chan *assetpb.Asset) // *assetpb.Asset with IamPolicy | ||
mergeCh := make(chan *assetpb.Asset) // *assetpb.Asset with Resource and IamPolicy | ||
enrichCh := make(chan *ExtendedGcpAsset) | ||
|
||
var assets []*assetpb.Asset | ||
assets = append(append(assets, resourceAssets...), policyAssets...) | ||
mergedAssets := mergeAssetContentType(assets) | ||
extendedAssets := p.extendWithCloudMetadata(ctx, mergedAssets) | ||
// Enrich network assets with dns policy | ||
p.enrichNetworkAssets(ctx, extendedAssets) | ||
go p.getAllAssets(ctx, p.config.Parent, assetpb.ContentType_RESOURCE, assetTypes, resourceCh) | ||
go p.getAllAssets(ctx, p.config.Parent, assetpb.ContentType_IAM_POLICY, assetTypes, policyCh) | ||
go p.mergeAssets(ctx, mergeCh, resourceCh, policyCh) | ||
go p.enrichAssets(ctx, mergeCh, enrichCh) | ||
|
||
return extendedAssets, nil | ||
for asset := range enrichCh { | ||
out <- asset | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method is used by the main fetcher (assets_fetcher
) and the way it works is that we start fetching resources and then merge them by name. the merging process sends assets to the enrichment channel if: 1) an asset has a resource and a policy, or 2) an asset has either a resource or a policy, and the other channel is closed, so we don't need to wait for merging with the other content type
after an asset is enriched it's sent to the out channel which is received in the fetcher and passed to the resourceCh
to start the relevant flavor (cspm/assets inventory) pipeline
logMetrics, err := p.ListAllAssetTypesByName(ctx, monitoringAssetTypes["LogMetric"]) | ||
if err != nil { | ||
return nil, err | ||
func (p *Provider) ListMonitoringAssets(ctx context.Context, out chan<- *MonitoringAsset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method used to fetch all resources and policies for 2 asset types, then group them by project, and once all assets are grouped, return a slice with all groups
it now does the same thing, but sends each group once it's ready, instead of waiting for all of them to be ready. also, only resources are fetched (no policies, as CIS rules don't require it. not all these asset types necessarily have a policy, but trying to fetch them was redundant)
if err != nil { | ||
return nil, err | ||
} | ||
func (p *Provider) ListProjectsAncestorsPolicies(ctx context.Context, out chan<- *ProjectPoliciesAsset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this used to send a slice of items that each represent a group of assets of the same project. it now does the same, but sends each group separately so we don't wait for all of them.
var assets []*ExtendedGcpAsset | ||
assets = append(append(assets, logMetrics...), alertPolicies...) | ||
monitoringAssets := getAssetsByProject[MonitoringAsset](assets, p.log, typeGenerator) | ||
func (p *Provider) ListProjectAssets(ctx context.Context, assetTypes []string, out chan<- *ProjectAssets) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this function now handles the same thing ListLoggingAssets
and ListServiceUsageAssets
used to. they both used to fetch both resource and policy for some asset types, group them all by project and send a slice with each group.
it now does the same thing, but starts by fetching projects and then for each project fetches only the resource of the given asset types and the sends it out. the difference is that we don't wait for all assets from all projects, plus we don't fetch policies as they weren't used by the relevant CIS rules.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR enhances the GCP provider to run its workflows concurrently by refactoring asset fetchers to use channels and goroutines. Key changes include:
- Refactoring of multiple fetchers (Service Usage, Policies, Monitoring, Networks, Log Sink, and Assets) to use concurrent channel-based asset delivery.
- Renaming and updating mock functions to support the new asynchronous calls.
- Updating tests to reflect the new concurrent behavior and added network asset support.
Reviewed Changes
Copilot reviewed 25 out of 25 changed files in this pull request and generated no comments.
File | Description |
---|---|
internal/resources/providers/gcplib/inventory/mock_service_api.go | Added Clear function and renamed/mock helper functions to support new ListAssetTypes method. |
internal/resources/fetching/preset/gcp_preset.go | Added initialization for the network assets fetcher. |
internal/resources/fetching/fetchers/gcp/* | Modified fetchers (Service Usage, Policies, Monitoring, Networks, Log Sink, Assets) to run concurrently via channels. |
internal/inventory/gcpfetcher/* | Updated asset fetcher and mock provider functions to replace ListAllAssetTypesByName with ListAssetTypes, and adjusted tests accordingly. |
Comments suppressed due to low confidence (2)
internal/resources/fetching/fetchers/gcp/service_usage_fetcher.go:38
- [nitpick] Consider standardizing the naming of the asset subtype field across fetchers. In some files the field is named 'subType' (e.g. here) while in others it is 'SubType'; standardizing this will improve code consistency.
subType string
internal/resources/fetchers/fetchers/gcp/assets_fetcher.go:86
- Ensure that the implementation of ListAssetTypes in the provider always closes the results channel; otherwise, the for-select loop may hang if the channel is never closed.
go f.provider.ListAssetTypes(ctx, lo.Keys(reversedGcpAssetTypes), resultsCh)
Init(ctx context.Context, log *clog.Logger, gcpConfig auth.GcpFactoryConfig) (ServiceAPI, error) | ||
} | ||
|
||
func (p *ProviderInitializer) Init(ctx context.Context, log *clog.Logger, gcpConfig auth.GcpFactoryConfig) (ServiceAPI, error) { | ||
func newAssetsInventoryWrapper(ctx context.Context, log *clog.Logger, gcpConfig auth.GcpFactoryConfig) (*AssetsInventoryWrapper, error) { | ||
limiter := NewAssetsInventoryRateLimiter(log) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we use rate limiting for the GCP Assets Inventory ListAssets
method, and because we're making more concurrent calls, the cycle fetching takes longer (~30seconds vs ~18secs for our gcp test account) as we wait between calls.
f.log.Infof("GcpAssetsFetcher.Fetch context err: %s", ctx.Err().Error()) | ||
f.log.Info("GcpAssetsFetcher.Fetch start") | ||
defer f.log.Info("GcpAssetsFetcher.Fetch done") | ||
defer f.provider.Clear() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clear()
will clear out the cache used for cloud account metadata, which is accessed by all assets fetched (mostly all hits)
it used to not get cleared at all (see #2182)
now it gets cleared when each fetcher exits, which is better but also not good, as it should get called when all fetchers are done. i think a better place would be if the registry would be the one calling a Clear
method on the Fetcher
interface, allowing them to clear up caches when fetching cycle is done.
b67b8de
to
031d2c7
Compare
} | ||
|
||
func getOrganizationId(ancestors []string) string { | ||
last := ancestors[len(ancestors)-1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: perhaps a check if the ancestors has zero length (is it possible?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of your changes
the core of this PR is to make any type of work done in the gcplib provider to run concurrently. this means that there's a pipeline of fetching, merging and enriching and once it's done the assets are sent to the fetcher which in turn sends them to the
resourceCh
of the flavor pipeline (cspm/assets inventory)specific and notable changes are mentioned in PR comments.
Screenshot/Data
CSPM GCP: same total findings count (1149) and same findings count per rule (with same passed/failed count) as in live env (elastic-security-test)
Assets Inventory GCP: 19 more assets (due to network assets addition which is on `8.x`/`main` and not `8.17`) as in live env (elastic-security-test)
organization-account
mode for 7 conseqeutive runs (5 mins period) with same number of assets in each cycleRelated Issues