Skip to content

make GCP provider work concurrently #3152

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions internal/flavors/benchmark/gcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func TestGCP_Initialize(t *testing.T) {
"gcp_cloud_assets_fetcher",
"gcp_monitoring_fetcher",
"gcp_service_usage_fetcher",
"gcp_network_fetcher",
},
},
{
Expand All @@ -85,6 +86,7 @@ func TestGCP_Initialize(t *testing.T) {
"gcp_cloud_assets_fetcher",
"gcp_monitoring_fetcher",
"gcp_service_usage_fetcher",
"gcp_network_fetcher",
},
},
{
Expand Down
13 changes: 5 additions & 8 deletions internal/inventory/gcpfetcher/fetcher_assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type (
provider inventoryProvider
}
inventoryProvider interface {
ListAllAssetTypesByName(ctx context.Context, assets []string) ([]*gcpinventory.ExtendedGcpAsset, error)
ListAssetTypes(ctx context.Context, assets []string, assetsCh chan<- *gcpinventory.ExtendedGcpAsset)
}
ResourcesClassification struct {
assetType string
Expand Down Expand Up @@ -79,14 +79,11 @@ func (f *assetsInventory) fetch(ctx context.Context, assetChan chan<- inventory.
f.logger.Infof("Fetching %s", assetType)
defer f.logger.Infof("Fetching %s - Finished", assetType)

gcpAssets, err := f.provider.ListAllAssetTypesByName(ctx, []string{assetType})
if err != nil {
f.logger.Errorf("Could not fetch %s: %v", assetType, err)
return
}
resultsCh := make(chan *gcpinventory.ExtendedGcpAsset)
go f.provider.ListAssetTypes(ctx, []string{assetType}, resultsCh)

for _, item := range gcpAssets {
assetChan <- getAssetEvent(classification, item)
for asset := range resultsCh {
assetChan <- getAssetEvent(classification, asset)
}
}

Expand Down
6 changes: 5 additions & 1 deletion internal/inventory/gcpfetcher/fetcher_assets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,11 @@ func TestAccountFetcher_Fetch_Assets(t *testing.T) {

provider := newMockInventoryProvider(t)
for _, resource := range ResourcesToFetch {
provider.EXPECT().mock.On("ListAllAssetTypesByName", mock.Anything, []string{resource.assetType}).Return([]*gcpinventory.ExtendedGcpAsset{createAsset(resource.assetType)}, nil)
provider.EXPECT().mock.On("ListAssetTypes", mock.Anything, []string{resource.assetType}, mock.Anything).Run(func(args mock.Arguments) {
ch := args.Get(2).(chan<- *gcpinventory.ExtendedGcpAsset)
ch <- createAsset(resource.assetType)
close(ch)
})
}
fetcher := newAssetsInventoryFetcher(logger, provider)
testutil.CollectResourcesAndMatch(t, fetcher, expected)
Expand Down
54 changes: 15 additions & 39 deletions internal/inventory/gcpfetcher/mock_inventory_provider.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

100 changes: 41 additions & 59 deletions internal/resources/fetching/fetchers/gcp/assets_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"

"github.com/huandu/xstrings"
"github.com/samber/lo"
"google.golang.org/protobuf/types/known/structpb"

"github.com/elastic/cloudbeat/internal/infra/clog"
Expand All @@ -48,41 +49,25 @@ type GcpAsset struct {
// map of types to asset types.
// sub-type is derived from asset type by using the first and last segments of the asset type name
// example: gcp-cloudkms-crypto-key
var GcpAssetTypes = map[string][]string{
fetching.ProjectManagement: {
inventory.CrmProjectAssetType,
},
fetching.KeyManagement: {
inventory.ApiKeysKeyAssetType,
inventory.CloudKmsCryptoKeyAssetType,
},
fetching.CloudIdentity: {
inventory.IamServiceAccountAssetType,
inventory.IamServiceAccountKeyAssetType,
},
fetching.CloudDatabase: {
inventory.BigqueryDatasetAssetType,
inventory.BigqueryTableAssetType,
inventory.SqlDatabaseInstanceAssetType,
},
fetching.CloudStorage: {
inventory.StorageBucketAssetType,
inventory.LogBucketAssetType,
},
fetching.CloudCompute: {
inventory.ComputeInstanceAssetType,
inventory.ComputeFirewallAssetType,
inventory.ComputeDiskAssetType,
inventory.ComputeNetworkAssetType,
inventory.ComputeBackendServiceAssetType,
inventory.ComputeSubnetworkAssetType,
},
fetching.CloudDns: {
inventory.DnsManagedZoneAssetType,
},
fetching.DataProcessing: {
inventory.DataprocClusterAssetType,
},

var reversedGcpAssetTypes = map[string]string{
inventory.CrmProjectAssetType: fetching.ProjectManagement,
inventory.ApiKeysKeyAssetType: fetching.KeyManagement,
inventory.CloudKmsCryptoKeyAssetType: fetching.KeyManagement,
inventory.IamServiceAccountAssetType: fetching.CloudIdentity,
inventory.IamServiceAccountKeyAssetType: fetching.CloudIdentity,
inventory.BigqueryDatasetAssetType: fetching.CloudDatabase,
inventory.BigqueryTableAssetType: fetching.CloudDatabase,
inventory.SqlDatabaseInstanceAssetType: fetching.CloudDatabase,
inventory.StorageBucketAssetType: fetching.CloudStorage,
inventory.LogBucketAssetType: fetching.CloudStorage,
inventory.ComputeInstanceAssetType: fetching.CloudCompute,
inventory.ComputeFirewallAssetType: fetching.CloudCompute,
inventory.ComputeDiskAssetType: fetching.CloudCompute,
inventory.ComputeBackendServiceAssetType: fetching.CloudCompute,
inventory.ComputeSubnetworkAssetType: fetching.CloudCompute,
inventory.DnsManagedZoneAssetType: fetching.CloudDns,
inventory.DataprocClusterAssetType: fetching.DataProcessing,
}

func NewGcpAssetsFetcher(_ context.Context, log *clog.Logger, ch chan fetching.ResourceInfo, provider inventory.ServiceAPI) *GcpAssetsFetcher {
Expand All @@ -94,32 +79,29 @@ func NewGcpAssetsFetcher(_ context.Context, log *clog.Logger, ch chan fetching.R
}

func (f *GcpAssetsFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Metadata) error {
f.log.Info("Starting GcpAssetsFetcher.Fetch")

for typeName, assetTypes := range GcpAssetTypes {
assets, err := f.provider.ListAllAssetTypesByName(ctx, assetTypes)
if err != nil {
f.log.Errorf("Failed to list assets for type %s: %s", typeName, err.Error())
continue
}

for _, asset := range assets {
select {
case <-ctx.Done():
f.log.Infof("GcpAssetsFetcher.Fetch context err: %s", ctx.Err().Error())
return nil
case f.resourceCh <- fetching.ResourceInfo{
CycleMetadata: cycleMetadata,
Resource: &GcpAsset{
Type: typeName,
SubType: getGcpSubType(asset.AssetType),
ExtendedAsset: asset,
},
}:
}
f.log.Info("GcpAssetsFetcher.Fetch start")
defer f.log.Info("GcpAssetsFetcher.Fetch done")
defer f.provider.Clear()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clear() will clear out the cache used for cloud account metadata, which is accessed by all assets fetched (mostly all hits)

it used to not get cleared at all (see #2182)
now it gets cleared when each fetcher exits, which is better but also not good, as it should get called when all fetchers are done. i think a better place would be if the registry would be the one calling a Clear method on the Fetcher interface, allowing them to clear up caches when fetching cycle is done.


resultsCh := make(chan *inventory.ExtendedGcpAsset)
go f.provider.ListAssetTypes(ctx, lo.Keys(reversedGcpAssetTypes), resultsCh)

for asset := range resultsCh {
select {
case <-ctx.Done():
f.log.Debugf("GcpAssetsFetcher.Fetch context done: %v", ctx.Err())
return nil

case f.resourceCh <- fetching.ResourceInfo{
CycleMetadata: cycleMetadata,
Resource: &GcpAsset{
Type: reversedGcpAssetTypes[asset.AssetType],
SubType: getGcpSubType(asset.AssetType),
ExtendedAsset: asset,
},
}:
}
}

return nil
}

Expand Down
77 changes: 35 additions & 42 deletions internal/resources/fetching/fetchers/gcp/assets_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package fetchers

import (
"context"
"sync"
"testing"

"cloud.google.com/go/asset/apiv1/assetpb"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"google.golang.org/protobuf/types/known/structpb"
Expand Down Expand Up @@ -54,53 +54,46 @@ func (s *GcpAssetsFetcherTestSuite) TearDownTest() {
}

func (s *GcpAssetsFetcherTestSuite) TestFetcher_Fetch() {
ctx := context.Background()
mockInventoryService := &inventory.MockServiceAPI{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg := sync.WaitGroup{}
mockInventoryService := new(inventory.MockServiceAPI)
fetcher := GcpAssetsFetcher{
log: testhelper.NewLogger(s.T()),
resourceCh: s.resourceCh,
provider: mockInventoryService,
}

mockInventoryService.EXPECT().ListAllAssetTypesByName(mock.Anything, mock.MatchedBy(func(_ []string) bool {
return true
})).Return(
[]*inventory.ExtendedGcpAsset{
{
CloudAccount: &fetching.CloudAccountMetadata{
AccountId: "prjId",
AccountName: "prjName",
OrganisationId: "orgId",
OrganizationName: "orgName",
},
Asset: &assetpb.Asset{
Name: "a", AssetType: "iam.googleapis.com/ServiceAccount",
},
},
}, nil,
)

err := fetcher.Fetch(ctx, cycle.Metadata{})
s.Require().NoError(err)
results := testhelper.CollectResources(s.resourceCh)

s.Len(results, len(GcpAssetTypes))

lo.ForEach(results, func(r fetching.ResourceInfo, _ int) {
metadata, err := r.Resource.GetMetadata()
s.Require().NoError(err)
cloudAccountMetadata := metadata.CloudAccountMetadata

s.Equal("prjName", cloudAccountMetadata.AccountName)
s.Equal("prjId", cloudAccountMetadata.AccountId)
s.Equal("orgId", cloudAccountMetadata.OrganisationId)
s.Equal("orgName", cloudAccountMetadata.OrganizationName)
if metadata.Type == fetching.CloudIdentity {
m, err := r.GetElasticCommonData()
s.Require().NoError(err, "error getting Elastic Common Data")
s.Len(m, 2)
}
})
expectedAsset := &inventory.ExtendedGcpAsset{
Asset: &assetpb.Asset{
AssetType: "compute.googleapis.com/Instance",
},
}
mockInventoryService.EXPECT().Clear()
mockInventoryService.On("ListAssetTypes", mock.Anything, mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
ch := args.Get(2).(chan<- *inventory.ExtendedGcpAsset)
ch <- expectedAsset
close(ch)
}).Once()

wg.Add(1)
go func() {
defer wg.Done()
err := fetcher.Fetch(ctx, cycle.Metadata{})
s.NoError(err)
}()

res := <-s.resourceCh
s.NotNil(res.Resource)
asset, ok := res.Resource.(*GcpAsset)
s.True(ok)
s.Equal(expectedAsset, asset.ExtendedAsset)
s.Equal("cloud-compute", asset.Type)
s.Equal("gcp-compute-instance", asset.SubType)

wg.Wait()
mockInventoryService.AssertExpectations(s.T())
}

func (s *GcpAssetsFetcherTestSuite) TestFetcher_ElasticCommonData() {
Expand Down
Loading