Skip to content

HIVE-2849: Migrate AWS SDK to v2 #2695

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions contrib/pkg/awsprivatelink/enable.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import (
"os/user"
"path/filepath"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/ec2"
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"

configv1 "github.com/openshift/api/config/v1"
hivev1 "github.com/openshift/hive/apis/hive/v1"
"github.com/openshift/hive/contrib/pkg/awsprivatelink/common"
awsutils "github.com/openshift/hive/contrib/pkg/utils/aws"
"github.com/openshift/hive/pkg/awsclient"
awsclient "github.com/openshift/hive/pkg/awsclientv2"
operatorutils "github.com/openshift/hive/pkg/operator/hive"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -119,10 +120,10 @@ func (o *enableOptions) Run(cmd *cobra.Command, args []string) error {
// Get active cluster's VPC, filtering by infra-id
targetTagKey := "kubernetes.io/cluster/" + o.infraId
describeVPCsOutput, err := o.awsClients.DescribeVpcs(&ec2.DescribeVpcsInput{
Filters: []*ec2.Filter{
Filters: []ec2types.Filter{
{
Name: aws.String("tag-key"),
Values: []*string{aws.String(targetTagKey)},
Values: []string{targetTagKey},
},
},
})
Expand Down
67 changes: 36 additions & 31 deletions contrib/pkg/awsprivatelink/endpointvpc/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@ package endpointvpc

import (
"context"
"errors"
"fmt"
"reflect"
"sort"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/ec2"
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
"github.com/aws/smithy-go"

hivev1 "github.com/openshift/hive/apis/hive/v1"
"github.com/openshift/hive/contrib/pkg/awsprivatelink/common"
awsutils "github.com/openshift/hive/contrib/pkg/utils/aws"
"github.com/openshift/hive/pkg/awsclient"
awsclient "github.com/openshift/hive/pkg/awsclientv2"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -108,15 +110,15 @@ func (o *endpointVPCAddOptions) Complete(cmd *cobra.Command, args []string) erro
func (o *endpointVPCAddOptions) Validate(cmd *cobra.Command, args []string) error {
// Check if the endpoint VPC exists
if _, err := o.endpointVpcClients.DescribeVpcs(&ec2.DescribeVpcsInput{
VpcIds: []*string{aws.String(o.endpointVpcId)},
VpcIds: []string{o.endpointVpcId},
}); err != nil {
log.WithError(err).Fatal("Failed to describe endpoint VPC")
}

// Check if the endpoint subnets belong to the endpoint VPC
err := o.endpointVpcClients.DescribeSubnetsPages(
&ec2.DescribeSubnetsInput{
SubnetIds: aws.StringSlice(o.endpointSubnetIds),
SubnetIds: o.endpointSubnetIds,
},
func(page *ec2.DescribeSubnetsOutput, lastPage bool) bool {
for _, subnet := range page.Subnets {
Expand All @@ -136,7 +138,7 @@ func (o *endpointVPCAddOptions) Validate(cmd *cobra.Command, args []string) erro

func (o *endpointVPCAddOptions) Run(cmd *cobra.Command, args []string) error {
// Get default SG of the endpoint VPC
endpointVPCDefaultSG, err := awsutils.GetDefaultSGOfVpc(o.endpointVpcClients, aws.String(o.endpointVpcId))
endpointVPCDefaultSG, err := awsutils.GetDefaultSGOfVpc(o.endpointVpcClients, o.endpointVpcId)
if err != nil {
log.WithError(err).Fatal("Failed to get default SG of the endpoint VPC")
}
Expand Down Expand Up @@ -169,30 +171,27 @@ func (o *endpointVPCAddOptions) Run(cmd *cobra.Command, args []string) error {
log.Info("Adding route to private route tables of the associated VPC")
if err = addRouteToRouteTables(
associatedVpcClients,
aws.String(associatedVpcId),
associatedVpcId,
endpointVpcCIDR,
vpcPeeringConnectionId,
&ec2.Filter{Name: aws.String("tag:Name"), Values: []*string{aws.String("*private*")}},
ec2types.Filter{Name: aws.String("tag:Name"), Values: []string{"*private*"}},
); err != nil {
log.WithError(err).Fatal("Failed to add route to private route tables of the associated VPC")
}

log.Info("Adding route to route tables of the endpoint subnets")
if err = addRouteToRouteTables(
o.endpointVpcClients,
aws.String(o.endpointVpcId),
o.endpointVpcId,
associatedVpcCIDR,
vpcPeeringConnectionId,
&ec2.Filter{Name: aws.String("association.subnet-id"), Values: aws.StringSlice(o.endpointSubnetIds)},
ec2types.Filter{Name: aws.String("association.subnet-id"), Values: o.endpointSubnetIds},
); err != nil {
log.WithError(err).Fatal("Failed to add route to route tables of the endpoint subnets")
}

// Update SGs
associatedVpcWorkerSG, err := awsutils.GetWorkerSGFromVpcId(
associatedVpcClients,
aws.String(associatedVpcId),
)
associatedVpcWorkerSG, err := awsutils.GetWorkerSGFromVpcId(associatedVpcClients, associatedVpcId)
if err != nil {
log.WithError(err).Fatal("Failed to get worker SG of the associated VPC")
}
Expand All @@ -210,8 +209,9 @@ func (o *endpointVPCAddOptions) Run(cmd *cobra.Command, args []string) error {
aws.String(fmt.Sprintf("Access from worker SG of associated VPC %s", associatedVpcId)),
); err != nil {
// Proceed if ingress already authorized, fail otherwise
switch aerr, ok := err.(awserr.Error); {
case ok && aerr.Code() == "InvalidPermission.Duplicate":
var aerr smithy.APIError
switch ok := errors.As(err, &aerr); {
case ok && aerr.ErrorCode() == "InvalidPermission.Duplicate":
log.Warnf("Traffic from the associated VPC's worker SG to the endpoint VPC's default SG is already authorized")
default:
log.WithError(err).Fatal("Failed to authorize traffic from the associated VPC's worker SG to the endpoint VPC's default SG")
Expand All @@ -226,8 +226,9 @@ func (o *endpointVPCAddOptions) Run(cmd *cobra.Command, args []string) error {
aws.String(fmt.Sprintf("Access from default SG of endpoint VPC %s", o.endpointVpcId)),
); err != nil {
// Proceed if ingress already authorized, fail otherwise
switch aerr, ok := err.(awserr.Error); {
case ok && aerr.Code() == "InvalidPermission.Duplicate":
var aerr smithy.APIError
switch ok := errors.As(err, &aerr); {
case ok && aerr.ErrorCode() == "InvalidPermission.Duplicate":
log.Warnf("Traffic from the endpoint VPC's default SG to the associated VPC's worker SG is already authorized")
default:
log.WithError(err).Fatal("Failed to authorize traffic from the endpoint VPC's default SG to the associated VPC's worker SG")
Expand All @@ -244,8 +245,9 @@ func (o *endpointVPCAddOptions) Run(cmd *cobra.Command, args []string) error {
aws.String(fmt.Sprintf("Access from CIDR block of associated VPC %s", associatedVpcId)),
); err != nil {
// Proceed if ingress already authorized, fail otherwise
switch aerr, ok := err.(awserr.Error); {
case ok && aerr.Code() == "InvalidPermission.Duplicate":
var aerr smithy.APIError
switch ok := errors.As(err, &aerr); {
case ok && aerr.ErrorCode() == "InvalidPermission.Duplicate":
log.Warnf("Traffic from the associated VPC's CIDR block to the endpoint VPC's default SG is already authorized")
default:
log.WithError(err).Fatal("Failed to authorize traffic from the associated VPC's CIDR block to the endpoint VPC's default SG")
Expand All @@ -260,8 +262,9 @@ func (o *endpointVPCAddOptions) Run(cmd *cobra.Command, args []string) error {
aws.String(fmt.Sprintf("Access from CIDR block of endpoint VPC %s", o.endpointVpcId)),
); err != nil {
// Proceed if ingress already authorized, fail otherwise
switch aerr, ok := err.(awserr.Error); {
case ok && aerr.Code() == "InvalidPermission.Duplicate":
var aerr smithy.APIError
switch ok := errors.As(err, &aerr); {
case ok && aerr.ErrorCode() == "InvalidPermission.Duplicate":
log.Warnf("Traffic from the endpoint VPC's CIDR block to the associated VPC's worker SG is already authorized")
default:
log.WithError(err).Fatal("Failed to authorize traffic from the endpoint VPC's CIDR block to the associated VPC's worker SG")
Expand All @@ -283,7 +286,7 @@ func (o *endpointVPCAddOptions) addEndpointVpcToHiveConfig() {
var endpointSubnets []hivev1.AWSPrivateLinkSubnet
if err := o.endpointVpcClients.DescribeSubnetsPages(
&ec2.DescribeSubnetsInput{
SubnetIds: aws.StringSlice(o.endpointSubnetIds),
SubnetIds: o.endpointSubnetIds,
},
func(page *ec2.DescribeSubnetsOutput, lastPage bool) bool {
for _, subnet := range page.Subnets {
Expand Down Expand Up @@ -335,13 +338,13 @@ func (o *endpointVPCAddOptions) addEndpointVpcToHiveConfig() {

func addRouteToRouteTables(
vpcClients awsclient.Client,
vpcId, peerCIDR, VpcPeeringConnectionId *string,
additionalFiltersForRouteTables ...*ec2.Filter,
vpcId string, peerCIDR, VpcPeeringConnectionId *string,
additionalFiltersForRouteTables ...ec2types.Filter,
) error {
filters := append([]*ec2.Filter{
filters := append([]ec2types.Filter{
{
Name: aws.String("vpc-id"),
Values: []*string{vpcId},
Values: []string{vpcId},
},
}, additionalFiltersForRouteTables...)

Expand All @@ -358,8 +361,9 @@ func addRouteToRouteTables(
})
if err != nil {
// Proceed if route already exists, fail otherwise
switch aerr, ok := err.(awserr.Error); {
case ok && aerr.Code() == "RouteAlreadyExists":
var aerr smithy.APIError
switch ok := errors.As(err, &aerr); {
case ok && aerr.ErrorCode() == "RouteAlreadyExists":
log.Warnf("Route already exists in route table %v", *routeTable.RouteTableId)
default:
log.WithError(err).Fatalf("Failed to create route for route table %v", *routeTable.RouteTableId)
Expand Down Expand Up @@ -389,10 +393,11 @@ func setupVpcPeeringConnection(
if err != nil {
return nil, err
}
// TODO: Nil pointer check?
log.Debugf("VPC peering connection %v requested", *createVpcPeeringConnectionOutput.VpcPeeringConnection.VpcPeeringConnectionId)

err = endpointVpcClients.WaitUntilVpcPeeringConnectionExists(&ec2.DescribeVpcPeeringConnectionsInput{
VpcPeeringConnectionIds: []*string{createVpcPeeringConnectionOutput.VpcPeeringConnection.VpcPeeringConnectionId},
VpcPeeringConnectionIds: []string{*createVpcPeeringConnectionOutput.VpcPeeringConnection.VpcPeeringConnectionId},
})
if err != nil {
return nil, err
Expand Down
Loading