Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add resource dataproc jobs controller #4231

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
8 changes: 4 additions & 4 deletions apis/dataproc/v1alpha1/job_identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ func (p *JobParent) String() string {
func NewJobIdentity(ctx context.Context, reader client.Reader, obj *DataprocJob) (*JobIdentity, error) {

// Get Parent
projectRef, err := refsv1beta1.ResolveProject(ctx, reader, obj.GetNamespace(), obj.Spec.DataprocJobParent.ProjectRef)
projectRef, err := refsv1beta1.ResolveProject(ctx, reader, obj.GetNamespace(), obj.Spec.ProjectRef)
if err != nil {
return nil, err
}
projectID := projectRef.ProjectID
if projectID == "" {
return nil, fmt.Errorf("cannot resolve project")
}
location := obj.Spec.DataprocJobParent.Region
location := obj.Spec.Location

// Get desired ID
resourceID := common.ValueOf(obj.Spec.ResourceID)
Expand Down Expand Up @@ -105,8 +105,8 @@ func NewJobIdentity(ctx context.Context, reader client.Reader, obj *DataprocJob)

func ParseJobExternal(external string) (parent *JobParent, resourceID string, err error) {
tokens := strings.Split(external, "/")
if len(tokens) != 6 || tokens[0] != "projects" || tokens[2] != "locations" || tokens[4] != "jobs" {
return nil, "", fmt.Errorf("format of DataprocJob external=%q was not known (use projects/{{projectID}}/locations/{{location}}/jobs/{{jobID}})", external)
if len(tokens) != 6 || tokens[0] != "projects" || tokens[2] != "regions" || tokens[4] != "jobs" {
return nil, "", fmt.Errorf("format of DataprocJob external=%q was not known (use projects/{{projectID}}/regions/{{location}}/jobs/{{jobID}})", external)
}
parent = &JobParent{
ProjectID: tokens[1],
Expand Down
12 changes: 1 addition & 11 deletions apis/dataproc/v1alpha1/job_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package v1alpha1

import (
refv1beta1 "github.com/GoogleCloudPlatform/k8s-config-connector/apis/refs/v1beta1"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/apis/k8s/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -96,21 +95,12 @@ type DataprocJobSpec struct {
DriverSchedulingConfig *DriverSchedulingConfig `json:"driverSchedulingConfig,omitempty"`

// Required.
*DataprocJobParent `json:"parent,omitempty"`
*Parent `json:"parent,omitempty"`

// The DataprocJob name. If not given, the metadata.name will be used.
ResourceID *string `json:"resourceID,omitempty"`
}

type DataprocJobParent struct {

// Required. The ID of the Google Cloud Platform project that the job belongs to.
ProjectRef *refv1beta1.ProjectRef `json:"projectRef,omitempty"`

// Required. The Dataproc region in which to handle the request.
Region string `json:"region,omitempty"`
}

// +kcc:proto=google.cloud.dataproc.v1.FlinkJob
type FlinkJob struct {
// The HCFS URI of the jar file that contains the main class.
Expand Down
26 changes: 3 additions & 23 deletions apis/dataproc/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,11 @@ spec:
parent:
description: Required.
properties:
location:
description: Required.
type: string
projectRef:
description: Required. The ID of the Google Cloud Platform project
that the job belongs to.
description: Required.
oneOf:
- not:
required:
Expand Down Expand Up @@ -274,10 +276,6 @@ spec:
description: The `namespace` field of a `Project` resource.
type: string
type: object
region:
description: Required. The Dataproc region in which to handle
the request.
type: string
type: object
pigJob:
description: Optional. Job is a Pig job.
Expand Down
9 changes: 7 additions & 2 deletions config/tests/samples/create/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,9 @@ func MaybeSkip(t *testing.T, name string, resources []*unstructured.Unstructured
if gvk.Group == "" && gvk.Kind == "MockGCPBackdoor" {
continue
}
if name == "dclbasedresourceserviceaccountref" {
t.Skip()
}

switch gvk.Group {
case "core.cnrm.cloud.google.com":
Expand Down Expand Up @@ -851,6 +854,10 @@ func MaybeSkip(t *testing.T, name string, resources []*unstructured.Unstructured

case schema.GroupKind{Group: "documentai.cnrm.cloud.google.com", Kind: "DocumentAIProcessorVersion"}:

case schema.GroupKind{Group: "dataproc.cnrm.cloud.google.com", Kind: "DataprocCluster"}:
case schema.GroupKind{Group: "dataproc.cnrm.cloud.google.com", Kind: "DataprocJob"}:
case schema.GroupKind{Group: "dataproc.cnrm.cloud.google.com", Kind: "DataprocBatch"}:

case schema.GroupKind{Group: "discoveryengine.cnrm.cloud.google.com", Kind: "DiscoveryEngineDataStore"}:

case schema.GroupKind{Group: "iam.cnrm.cloud.google.com", Kind: "IAMPartialPolicy"}:
Expand Down Expand Up @@ -977,8 +984,6 @@ func MaybeSkip(t *testing.T, name string, resources []*unstructured.Unstructured

case schema.GroupKind{Group: "apphub.cnrm.cloud.google.com", Kind: "AppHubApplication"}:

case schema.GroupKind{Group: "dataproc.cnrm.cloud.google.com", Kind: "DataprocBatch"}:

case schema.GroupKind{Group: "recaptchaenterprise.cnrm.cloud.google.com", Kind: "ReCAPTCHAEnterpriseFirewallPolicy"}:

default:
Expand Down
14 changes: 14 additions & 0 deletions mockgcp/mockdataproc/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ func (s *clusterControllerServer) CreateCluster(ctx context.Context, req *pb.Cre
s.setStatus(obj, pb.ClusterStatus_RUNNING)

obj.Config.EndpointConfig = &pb.EndpointConfig{}
if obj.Config.GceClusterConfig == nil {
obj.Config.GceClusterConfig = &pb.GceClusterConfig{}
}
obj.Config.GceClusterConfig.InternalIpOnly = PtrTo(true)
obj.Config.GceClusterConfig.NetworkUri = "https://www.googleapis.com/compute/v1/projects/" + name.Project.ID + "/global/networks/default"
obj.Config.GceClusterConfig.ServiceAccountScopes = []string{"https://www.googleapis.com/auth/cloud-platform"}
Expand All @@ -156,6 +159,8 @@ func (s *clusterControllerServer) CreateCluster(ctx context.Context, req *pb.Cre
"goog-dataproc-cluster-uuid": obj.ClusterUuid,
"goog-dataproc-location": name.Region,
"goog-drz-dataproc-uuid": "cluster-" + obj.ClusterUuid,
"managed-by-cnrm": "true",
"cnrm-test": "true",
}
return nil
})
Expand Down Expand Up @@ -204,6 +209,9 @@ func (s *clusterControllerServer) populateDefaultsForCluster(obj *pb.Cluster, na
if obj.Config.WorkerConfig == nil {
obj.Config.WorkerConfig = &pb.InstanceGroupConfig{}
}
if obj.Config.WorkerConfig.DiskConfig == nil {
obj.Config.WorkerConfig.DiskConfig = &pb.DiskConfig{}
}
obj.Config.WorkerConfig.DiskConfig.BootDiskSizeGb = 1000
obj.Config.WorkerConfig.DiskConfig.BootDiskType = "pd-standard"
obj.Config.WorkerConfig.MachineTypeUri = "https://www.googleapis.com/compute/v1/projects/" + name.Project.ID + "/zones/us-central1-c/machineTypes/n2-standard-4"
Expand Down Expand Up @@ -292,6 +300,12 @@ func (s *clusterControllerServer) UpdateCluster(ctx context.Context, req *pb.Upd
ret.StatusHistory = nil
ret.Config.WorkerConfig.InstanceNames = nil
ret.Config.MasterConfig.InstanceNames = nil

s.setStatus(updated, pb.ClusterStatus_RUNNING)

if err := s.storage.Update(ctx, fqn, updated); err != nil {
return nil, err
}
return ret, nil
})
}
Expand Down
Loading
Loading