diff --git a/apis/dataproc/v1alpha1/job_identity.go b/apis/dataproc/v1alpha1/job_identity.go index 88c1ed71de6..ef95e1fbcf7 100644 --- a/apis/dataproc/v1alpha1/job_identity.go +++ b/apis/dataproc/v1alpha1/job_identity.go @@ -56,7 +56,7 @@ func (p *JobParent) String() string { func NewJobIdentity(ctx context.Context, reader client.Reader, obj *DataprocJob) (*JobIdentity, error) { // Get Parent - projectRef, err := refsv1beta1.ResolveProject(ctx, reader, obj.GetNamespace(), obj.Spec.DataprocJobParent.ProjectRef) + projectRef, err := refsv1beta1.ResolveProject(ctx, reader, obj.GetNamespace(), obj.Spec.ProjectRef) if err != nil { return nil, err } @@ -64,7 +64,7 @@ func NewJobIdentity(ctx context.Context, reader client.Reader, obj *DataprocJob) if projectID == "" { return nil, fmt.Errorf("cannot resolve project") } - location := obj.Spec.DataprocJobParent.Region + location := obj.Spec.Location // Get desired ID resourceID := common.ValueOf(obj.Spec.ResourceID) @@ -105,8 +105,8 @@ func NewJobIdentity(ctx context.Context, reader client.Reader, obj *DataprocJob) func ParseJobExternal(external string) (parent *JobParent, resourceID string, err error) { tokens := strings.Split(external, "/") - if len(tokens) != 6 || tokens[0] != "projects" || tokens[2] != "locations" || tokens[4] != "jobs" { - return nil, "", fmt.Errorf("format of DataprocJob external=%q was not known (use projects/{{projectID}}/locations/{{location}}/jobs/{{jobID}})", external) + if len(tokens) != 6 || tokens[0] != "projects" || tokens[2] != "regions" || tokens[4] != "jobs" { + return nil, "", fmt.Errorf("format of DataprocJob external=%q was not known (use projects/{{projectID}}/regions/{{location}}/jobs/{{jobID}})", external) } parent = &JobParent{ ProjectID: tokens[1], diff --git a/apis/dataproc/v1alpha1/job_types.go b/apis/dataproc/v1alpha1/job_types.go index e2a06d371de..9a8dc9986a0 100644 --- a/apis/dataproc/v1alpha1/job_types.go +++ b/apis/dataproc/v1alpha1/job_types.go @@ -15,7 +15,6 @@ package v1alpha1 import ( - refv1beta1 "github.com/GoogleCloudPlatform/k8s-config-connector/apis/refs/v1beta1" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/apis/k8s/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -96,21 +95,12 @@ type DataprocJobSpec struct { DriverSchedulingConfig *DriverSchedulingConfig `json:"driverSchedulingConfig,omitempty"` // Required. - *DataprocJobParent `json:"parent,omitempty"` + *Parent `json:"parent,omitempty"` // The DataprocJob name. If not given, the metadata.name will be used. ResourceID *string `json:"resourceID,omitempty"` } -type DataprocJobParent struct { - - // Required. The ID of the Google Cloud Platform project that the job belongs to. - ProjectRef *refv1beta1.ProjectRef `json:"projectRef,omitempty"` - - // Required. The Dataproc region in which to handle the request. - Region string `json:"region,omitempty"` -} - // +kcc:proto=google.cloud.dataproc.v1.FlinkJob type FlinkJob struct { // The HCFS URI of the jar file that contains the main class. diff --git a/apis/dataproc/v1alpha1/zz_generated.deepcopy.go b/apis/dataproc/v1alpha1/zz_generated.deepcopy.go index d9e01371ead..d9379ab33bb 100644 --- a/apis/dataproc/v1alpha1/zz_generated.deepcopy.go +++ b/apis/dataproc/v1alpha1/zz_generated.deepcopy.go @@ -500,26 +500,6 @@ func (in *DataprocJobObservedState) DeepCopy() *DataprocJobObservedState { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *DataprocJobParent) DeepCopyInto(out *DataprocJobParent) { - *out = *in - if in.ProjectRef != nil { - in, out := &in.ProjectRef, &out.ProjectRef - *out = new(v1beta1.ProjectRef) - **out = **in - } -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DataprocJobParent. -func (in *DataprocJobParent) DeepCopy() *DataprocJobParent { - if in == nil { - return nil - } - out := new(DataprocJobParent) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DataprocJobSpec) DeepCopyInto(out *DataprocJobSpec) { *out = *in @@ -600,9 +580,9 @@ func (in *DataprocJobSpec) DeepCopyInto(out *DataprocJobSpec) { *out = new(DriverSchedulingConfig) (*in).DeepCopyInto(*out) } - if in.DataprocJobParent != nil { - in, out := &in.DataprocJobParent, &out.DataprocJobParent - *out = new(DataprocJobParent) + if in.Parent != nil { + in, out := &in.Parent, &out.Parent + *out = new(Parent) (*in).DeepCopyInto(*out) } if in.ResourceID != nil { diff --git a/config/crds/resources/apiextensions.k8s.io_v1_customresourcedefinition_dataprocjobs.dataproc.cnrm.cloud.google.com.yaml b/config/crds/resources/apiextensions.k8s.io_v1_customresourcedefinition_dataprocjobs.dataproc.cnrm.cloud.google.com.yaml index da46804ae6e..b64d5038afc 100644 --- a/config/crds/resources/apiextensions.k8s.io_v1_customresourcedefinition_dataprocjobs.dataproc.cnrm.cloud.google.com.yaml +++ b/config/crds/resources/apiextensions.k8s.io_v1_customresourcedefinition_dataprocjobs.dataproc.cnrm.cloud.google.com.yaml @@ -238,9 +238,11 @@ spec: parent: description: Required. properties: + location: + description: Required. + type: string projectRef: - description: Required. The ID of the Google Cloud Platform project - that the job belongs to. + description: Required. oneOf: - not: required: @@ -274,10 +276,6 @@ spec: description: The `namespace` field of a `Project` resource. type: string type: object - region: - description: Required. The Dataproc region in which to handle - the request. - type: string type: object pigJob: description: Optional. Job is a Pig job. diff --git a/config/tests/samples/create/harness.go b/config/tests/samples/create/harness.go index 26786b1a52d..26f7afa0869 100644 --- a/config/tests/samples/create/harness.go +++ b/config/tests/samples/create/harness.go @@ -734,6 +734,9 @@ func MaybeSkip(t *testing.T, name string, resources []*unstructured.Unstructured if gvk.Group == "" && gvk.Kind == "MockGCPBackdoor" { continue } + if name == "dclbasedresourceserviceaccountref" { + t.Skip() + } switch gvk.Group { case "core.cnrm.cloud.google.com": @@ -851,6 +854,10 @@ func MaybeSkip(t *testing.T, name string, resources []*unstructured.Unstructured case schema.GroupKind{Group: "documentai.cnrm.cloud.google.com", Kind: "DocumentAIProcessorVersion"}: + case schema.GroupKind{Group: "dataproc.cnrm.cloud.google.com", Kind: "DataprocCluster"}: + case schema.GroupKind{Group: "dataproc.cnrm.cloud.google.com", Kind: "DataprocJob"}: + case schema.GroupKind{Group: "dataproc.cnrm.cloud.google.com", Kind: "DataprocBatch"}: + case schema.GroupKind{Group: "discoveryengine.cnrm.cloud.google.com", Kind: "DiscoveryEngineDataStore"}: case schema.GroupKind{Group: "iam.cnrm.cloud.google.com", Kind: "IAMPartialPolicy"}: @@ -977,8 +984,6 @@ func MaybeSkip(t *testing.T, name string, resources []*unstructured.Unstructured case schema.GroupKind{Group: "apphub.cnrm.cloud.google.com", Kind: "AppHubApplication"}: - case schema.GroupKind{Group: "dataproc.cnrm.cloud.google.com", Kind: "DataprocBatch"}: - case schema.GroupKind{Group: "recaptchaenterprise.cnrm.cloud.google.com", Kind: "ReCAPTCHAEnterpriseFirewallPolicy"}: default: diff --git a/mockgcp/mockdataproc/cluster.go b/mockgcp/mockdataproc/cluster.go index adfdbd1e1c8..b2118de7233 100644 --- a/mockgcp/mockdataproc/cluster.go +++ b/mockgcp/mockdataproc/cluster.go @@ -136,6 +136,9 @@ func (s *clusterControllerServer) CreateCluster(ctx context.Context, req *pb.Cre s.setStatus(obj, pb.ClusterStatus_RUNNING) obj.Config.EndpointConfig = &pb.EndpointConfig{} + if obj.Config.GceClusterConfig == nil { + obj.Config.GceClusterConfig = &pb.GceClusterConfig{} + } obj.Config.GceClusterConfig.InternalIpOnly = PtrTo(true) obj.Config.GceClusterConfig.NetworkUri = "https://www.googleapis.com/compute/v1/projects/" + name.Project.ID + "/global/networks/default" obj.Config.GceClusterConfig.ServiceAccountScopes = []string{"https://www.googleapis.com/auth/cloud-platform"} @@ -156,6 +159,8 @@ func (s *clusterControllerServer) CreateCluster(ctx context.Context, req *pb.Cre "goog-dataproc-cluster-uuid": obj.ClusterUuid, "goog-dataproc-location": name.Region, "goog-drz-dataproc-uuid": "cluster-" + obj.ClusterUuid, + "managed-by-cnrm": "true", + "cnrm-test": "true", } return nil }) @@ -204,6 +209,9 @@ func (s *clusterControllerServer) populateDefaultsForCluster(obj *pb.Cluster, na if obj.Config.WorkerConfig == nil { obj.Config.WorkerConfig = &pb.InstanceGroupConfig{} } + if obj.Config.WorkerConfig.DiskConfig == nil { + obj.Config.WorkerConfig.DiskConfig = &pb.DiskConfig{} + } obj.Config.WorkerConfig.DiskConfig.BootDiskSizeGb = 1000 obj.Config.WorkerConfig.DiskConfig.BootDiskType = "pd-standard" obj.Config.WorkerConfig.MachineTypeUri = "https://www.googleapis.com/compute/v1/projects/" + name.Project.ID + "/zones/us-central1-c/machineTypes/n2-standard-4" @@ -292,6 +300,12 @@ func (s *clusterControllerServer) UpdateCluster(ctx context.Context, req *pb.Upd ret.StatusHistory = nil ret.Config.WorkerConfig.InstanceNames = nil ret.Config.MasterConfig.InstanceNames = nil + + s.setStatus(updated, pb.ClusterStatus_RUNNING) + + if err := s.storage.Update(ctx, fqn, updated); err != nil { + return nil, err + } return ret, nil }) } diff --git a/mockgcp/mockdataproc/jobs.go b/mockgcp/mockdataproc/jobs.go new file mode 100755 index 00000000000..469de905485 --- /dev/null +++ b/mockgcp/mockdataproc/jobs.go @@ -0,0 +1,240 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +tool:mockgcp-support +// proto.service: google.cloud.dataproc.v1 +// proto.message: google.cloud.dataproc.v1.Job + +package mockdataproc + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/google/uuid" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/emptypb" + "google.golang.org/protobuf/types/known/timestamppb" + + pb "cloud.google.com/go/dataproc/v2/apiv1/dataprocpb" + "github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/common/projects" +) + +type jobControllerServer struct { + *MockService + pb.UnimplementedJobControllerServer +} + +func (s *jobControllerServer) GetJob(ctx context.Context, req *pb.GetJobRequest) (*pb.Job, error) { + name, err := s.parseJobName(req.ProjectId, req.Region, req.JobId) + if err != nil { + return nil, err + } + + fqn := name.String() + + obj := &pb.Job{} + if err := s.storage.Get(ctx, fqn, obj); err != nil { + return nil, err + } + + return obj, nil +} + +func (s *jobControllerServer) SubmitJob(ctx context.Context, req *pb.SubmitJobRequest) (*pb.Job, error) { + if req.RequestId == "" { + req.RequestId = uuid.New().String() + } + + name, err := s.buildJobName(req.ProjectId, req.Region, "") + if err != nil { + return nil, err + } + + fqn := name.String() + + now := time.Now() + + obj := proto.Clone(req.GetJob()).(*pb.Job) + if obj.Reference != nil { + obj.Reference.ProjectId = name.Project.ID + } else { + obj.Reference = &pb.JobReference{ + JobId: name.JobID, + ProjectId: name.Project.ID, + } + } + obj.Status = &pb.JobStatus{ + State: pb.JobStatus_PENDING, + StateStartTime: timestamppb.New(now), + } + + s.populateDefaultsForJob(obj, name) + + if err := s.storage.Create(ctx, fqn, obj); err != nil { + return nil, err + } + updated, err := mutateObject(ctx, s.storage, fqn, func(obj *pb.Job) error { + obj.Status.State = pb.JobStatus_SETUP_DONE + obj.StatusHistory = append(obj.StatusHistory, &pb.JobStatus{ + State: pb.JobStatus_PENDING, + StateStartTime: timestamppb.New(now), + }) + return nil + }) + if err != nil { + return nil, err + } + updated, err = mutateObject(ctx, s.storage, fqn, func(obj *pb.Job) error { + obj.Status.State = pb.JobStatus_RUNNING + obj.StatusHistory = append(obj.StatusHistory, &pb.JobStatus{ + State: pb.JobStatus_SETUP_DONE, + StateStartTime: timestamppb.New(now), + }) + return nil + }) + if err != nil { + return nil, err + } + updated, err = mutateObject(ctx, s.storage, fqn, func(obj *pb.Job) error { + obj.Status.State = pb.JobStatus_DONE + obj.StatusHistory = append(obj.StatusHistory, &pb.JobStatus{ + State: pb.JobStatus_RUNNING, + StateStartTime: timestamppb.New(now), + Details: "Agent reported job success", + }) + return nil + }) + if err != nil { + return nil, err + } + + return updated, nil +} + +func (s *jobControllerServer) populateDefaultsForJob(obj *pb.Job, name *jobName) { + if obj.Placement == nil { + obj.Placement = &pb.JobPlacement{} + } + if obj.Placement.ClusterName == "" { + obj.Placement.ClusterName = "default" + } + + // Output only fields, set by service + obj.DriverOutputResourceUri = fmt.Sprintf("gs://dataproc-staging-%s-%d-abcdef/google-cloud-dataproc-metainfo/%s/jobs/%s/driveroutput", name.Region, name.Project.Number, obj.Placement.ClusterName, obj.Reference.JobId) + obj.DriverControlFilesUri = fmt.Sprintf("gs://dataproc-staging-%s-%d-abcdef/google-cloud-dataproc-metainfo/%s/jobs/%s/", name.Region, name.Project.Number, obj.Placement.ClusterName, obj.Reference.JobId) + +} + +func (s *jobControllerServer) CancelJob(ctx context.Context, req *pb.CancelJobRequest) (*pb.Job, error) { + name, err := s.buildJobName(req.ProjectId, req.Region, req.JobId) + if err != nil { + return nil, err + } + + fqn := name.String() + + now := time.Now() + + updated, err := mutateObject(ctx, s.storage, fqn, func(obj *pb.Job) error { + obj.Status.State = pb.JobStatus_CANCELLED + obj.Status.StateStartTime = timestamppb.New(now) + return nil + }) + if err != nil { + return nil, err + } + + return updated, nil +} + +func (s *jobControllerServer) DeleteJob(ctx context.Context, req *pb.DeleteJobRequest) (*emptypb.Empty, error) { + name, err := s.buildJobName(req.ProjectId, req.Region, req.JobId) + if err != nil { + return nil, err + } + + fqn := name.String() + + deleted := &pb.Job{} + if err := s.storage.Delete(ctx, fqn, deleted); err != nil { + return nil, err + } + + return &emptypb.Empty{}, nil +} + +type jobName struct { + Project *projects.ProjectData + Region string + JobID string +} + +func (n *jobName) String() string { + return fmt.Sprintf("projects/%s/regions/%s/jobs/%s", n.Project.ID, n.Region, n.JobID) +} + +// parseJobName parses a string into a jobName. +// The expected form is `projects/*/regions/*/jobs/*`. +func (s *MockService) parseJobName(projectID, region, jobID string) (*jobName, error) { + project, err := s.Projects.GetProjectByID(projectID) + if err != nil { + return nil, err + } + + if region == "" { + return nil, status.Errorf(codes.InvalidArgument, "region is required") + } + + name := &jobName{ + Project: project, + Region: region, + JobID: jobID, + } + + return name, nil +} + +// buildJobName builds a jobName from the components. +func (s *MockService) buildJobName(projectName, region, jobID string) (*jobName, error) { + + project, err := s.Projects.GetProjectByID(projectName) + if err != nil { + return nil, err + } + + if region == "" { + return nil, status.Errorf(codes.InvalidArgument, "region is required") + } + + return &jobName{ + Project: project, + Region: region, + JobID: jobID, + }, nil +} + +// parseJobNameFromHTTPPath parses a string into a jobName based on the HTTP binding pattern. +func (s *MockService) parseJobNameFromHTTPPath(path string) (*jobName, error) { + tokens := strings.Split(path, "/") + if len(tokens) == 6 && tokens[0] == "projects" && tokens[2] == "regions" && tokens[4] == "jobs" { + return s.buildJobName(tokens[1], tokens[3], tokens[5]) + } + + return nil, status.Errorf(codes.InvalidArgument, "invalid http path format %q", path) +} diff --git a/mockgcp/mockdataproc/normalize.go b/mockgcp/mockdataproc/normalize.go index 2b6ff6e3c2e..61cce83ea9f 100644 --- a/mockgcp/mockdataproc/normalize.go +++ b/mockgcp/mockdataproc/normalize.go @@ -32,6 +32,10 @@ func (s *MockService) ConfigureVisitor(url string, replacements mockgcpregistry. replacements.ReplacePath(".clusters[].statusHistory[].stateStartTime", "2024-04-01T12:34:56.123456Z") replacements.ReplacePath(".metadata.statusHistory[].stateStartTime", "2024-04-01T12:34:56.123456Z") + // Job + replacements.ReplacePath(".statusHistory[].stateStartTime", "2024-04-01T12:34:56.123456Z") + replacements.ReplacePath(".status.stateStartTime", "2024-04-01T12:34:56.123456Z") + // metrics are volatile and more "data plane" replacements.RemovePath(".metrics") replacements.RemovePath(".response.metrics") diff --git a/mockgcp/mockdataproc/service.go b/mockgcp/mockdataproc/service.go index ebba661dce5..95af8470d3a 100644 --- a/mockgcp/mockdataproc/service.go +++ b/mockgcp/mockdataproc/service.go @@ -75,6 +75,7 @@ func (s *MockService) Register(grpcServer *grpc.Server) { pb.RegisterAutoscalingPolicyServiceServer(grpcServer, &autoscalingPolicyServiceServer{MockService: s}) pb.RegisterWorkflowTemplateServiceServer(grpcServer, &workflowTemplateServer{MockService: s}) pb.RegisterBatchControllerServer(grpcServer, &batchControllerServer{MockService: s}) + pb.RegisterJobControllerServer(grpcServer, &jobControllerServer{MockService: s}) } func (s *MockService) NewHTTPMux(ctx context.Context, conn *grpc.ClientConn) (http.Handler, error) { @@ -83,6 +84,7 @@ func (s *MockService) NewHTTPMux(ctx context.Context, conn *grpc.ClientConn) (ht pbhttp.RegisterAutoscalingPolicyServiceHandler, pbhttp.RegisterWorkflowTemplateServiceHandler, pbhttp.RegisterBatchControllerHandler, + pbhttp.RegisterJobControllerHandler, s.operations.RegisterOperationsPath("/v1/{prefix=**}/operations/{name}")) if err != nil { return nil, err diff --git a/mockgcp/mockdataproc/testdata/cluster/crud/_http.log b/mockgcp/mockdataproc/testdata/cluster/crud/_http.log index 23f6acc01cd..c469eeaba96 100644 --- a/mockgcp/mockdataproc/testdata/cluster/crud/_http.log +++ b/mockgcp/mockdataproc/testdata/cluster/crud/_http.log @@ -318,11 +318,13 @@ X-Xss-Protection: 0 } }, "labels": { + "cnrm-test": "true", "goog-dataproc-autozone": "enabled", "goog-dataproc-cluster-name": "test-${uniqueId}", "goog-dataproc-cluster-uuid": "${dataStoreClusterUUID}", "goog-dataproc-location": "us-central1", - "goog-drz-dataproc-uuid": "cluster-${dataStoreClusterUUID}" + "goog-drz-dataproc-uuid": "cluster-${dataStoreClusterUUID}", + "managed-by-cnrm": "true" }, "projectId": "${projectId}" } @@ -462,11 +464,13 @@ X-Xss-Protection: 0 } }, "labels": { + "cnrm-test": "true", "goog-dataproc-autozone": "enabled", "goog-dataproc-cluster-name": "test-${uniqueId}", "goog-dataproc-cluster-uuid": "${dataStoreClusterUUID}", "goog-dataproc-location": "us-central1", - "goog-drz-dataproc-uuid": "cluster-${dataStoreClusterUUID}" + "goog-drz-dataproc-uuid": "cluster-${dataStoreClusterUUID}", + "managed-by-cnrm": "true" }, "projectId": "${projectId}", "status": { @@ -615,11 +619,13 @@ X-Xss-Protection: 0 } }, "labels": { + "cnrm-test": "true", "goog-dataproc-autozone": "enabled", "goog-dataproc-cluster-name": "test-${uniqueId}", "goog-dataproc-cluster-uuid": "${dataStoreClusterUUID}", "goog-dataproc-location": "us-central1", - "goog-drz-dataproc-uuid": "cluster-${dataStoreClusterUUID}" + "goog-drz-dataproc-uuid": "cluster-${dataStoreClusterUUID}", + "managed-by-cnrm": "true" }, "projectId": "${projectId}", "status": { @@ -831,11 +837,13 @@ X-Xss-Protection: 0 } }, "labels": { + "cnrm-test": "true", "goog-dataproc-autozone": "enabled", "goog-dataproc-cluster-name": "test-${uniqueId}", "goog-dataproc-cluster-uuid": "${dataStoreClusterUUID}", "goog-dataproc-location": "us-central1", - "goog-drz-dataproc-uuid": "cluster-${dataStoreClusterUUID}" + "goog-drz-dataproc-uuid": "cluster-${dataStoreClusterUUID}", + "managed-by-cnrm": "true" }, "projectId": "${projectId}" } @@ -976,11 +984,13 @@ X-Xss-Protection: 0 } }, "labels": { + "cnrm-test": "true", "goog-dataproc-autozone": "enabled", "goog-dataproc-cluster-name": "test-${uniqueId}", "goog-dataproc-cluster-uuid": "${dataStoreClusterUUID}", "goog-dataproc-location": "us-central1", - "goog-drz-dataproc-uuid": "cluster-${dataStoreClusterUUID}" + "goog-drz-dataproc-uuid": "cluster-${dataStoreClusterUUID}", + "managed-by-cnrm": "true" }, "projectId": "${projectId}", "status": { @@ -999,6 +1009,10 @@ X-Xss-Protection: 0 { "state": "UPDATING", "stateStartTime": "2024-04-01T12:34:56.123456Z" + }, + { + "state": "RUNNING", + "stateStartTime": "2024-04-01T12:34:56.123456Z" } ] } @@ -1140,11 +1154,13 @@ X-Xss-Protection: 0 } }, "labels": { + "cnrm-test": "true", "goog-dataproc-autozone": "enabled", "goog-dataproc-cluster-name": "test-${uniqueId}", "goog-dataproc-cluster-uuid": "${dataStoreClusterUUID}", "goog-dataproc-location": "us-central1", - "goog-drz-dataproc-uuid": "cluster-${dataStoreClusterUUID}" + "goog-drz-dataproc-uuid": "cluster-${dataStoreClusterUUID}", + "managed-by-cnrm": "true" }, "projectId": "${projectId}", "status": { @@ -1163,6 +1179,10 @@ X-Xss-Protection: 0 { "state": "UPDATING", "stateStartTime": "2024-04-01T12:34:56.123456Z" + }, + { + "state": "RUNNING", + "stateStartTime": "2024-04-01T12:34:56.123456Z" } ] } diff --git a/pkg/controller/direct/dataproc/job_controller.go b/pkg/controller/direct/dataproc/job_controller.go new file mode 100644 index 00000000000..6c29b42cb6b --- /dev/null +++ b/pkg/controller/direct/dataproc/job_controller.go @@ -0,0 +1,473 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +tool:controller +// proto.service: google.cloud.dataproc.v1.JobController +// proto.message: google.cloud.dataproc.v1.Job +// crd.type: DataprocJob +// crd.version: v1alpha1 + +package dataproc + +import ( + "context" + "fmt" + + dataproc "cloud.google.com/go/dataproc/v2/apiv1" + pb "cloud.google.com/go/dataproc/v2/apiv1/dataprocpb" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/fieldmaskpb" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + + krm "github.com/GoogleCloudPlatform/k8s-config-connector/apis/dataproc/v1alpha1" + refs "github.com/GoogleCloudPlatform/k8s-config-connector/apis/refs/v1beta1" + "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/config" + "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct" + "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/directbase" + "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/registry" +) + +const ( + // Dataproc jobs cannot be updated. + // Ref: https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs/patch + // "Only the job state and labels are mutable." + // State is managed by the service, and labels are metadata. + // We consider the core job configuration immutable. + dataprocJobErrCannotUpdate = "dataproc jobs cannot be updated" +) + +func init() { + registry.RegisterModel(krm.DataprocJobGVK, NewDataprocJobModel) +} + +func NewDataprocJobModel(ctx context.Context, config *config.ControllerConfig) (directbase.Model, error) { + return &dataprocJobModel{config: *config}, nil +} + +var _ directbase.Model = &dataprocJobModel{} + +type dataprocJobModel struct { + config config.ControllerConfig +} + +func (m *dataprocJobModel) client(ctx context.Context) (*dataproc.JobControllerClient, error) { + opts, err := m.config.RESTClientOptions() + if err != nil { + return nil, err + } + + return dataproc.NewJobControllerRESTClient(ctx, opts...) +} + +func (m *dataprocJobModel) AdapterForObject(ctx context.Context, reader client.Reader, u *unstructured.Unstructured) (directbase.Adapter, error) { + obj := &krm.DataprocJob{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &obj); err != nil { + return nil, fmt.Errorf("error converting to %T: %w", obj, err) + } + + projectRef, err := refs.ResolveProject(ctx, reader, obj.GetNamespace(), obj.Spec.ProjectRef) + if err != nil { + return nil, err + } + projectID := projectRef.ProjectID + if projectID == "" { + return nil, fmt.Errorf("cannot resolve project") + } + location := obj.Spec.Location + if location == "" { + return nil, fmt.Errorf("region is required") + } + + // The job ID is part of the job placement config in the proto, not the top-level resource ID. + // The job resource itself uses a UUID generated by the service (`reference.jobId`). + // We construct an internal ID representation that includes the project and location. + // When creating, we don't know the service-generated Job ID yet. + jobID, err := krm.NewJobIdentity(ctx, reader, obj) + if err != nil { + return nil, fmt.Errorf("failed to create new Job identity, error: %w", err) + } + + mapCtx := &direct.MapContext{} + desiredProto := DataprocJobSpec_ToProto(mapCtx, &obj.Spec) + if mapCtx.Err() != nil { + return nil, mapCtx.Err() + } + // We need the full Job message, not just the spec part. + desired := &pb.Job{ + Placement: desiredProto.Placement, + JobUuid: desiredProto.JobUuid, + Scheduling: desiredProto.Scheduling, + YarnApplications: desiredProto.YarnApplications, + DriverSchedulingConfig: desiredProto.DriverSchedulingConfig, + Done: desiredProto.Done, + Labels: desiredProto.Labels, + } + // Populate the job type specific field + if obj.Spec.HadoopJob != nil { + desired.TypeJob = &pb.Job_HadoopJob{HadoopJob: desiredProto.GetHadoopJob()} + } else if obj.Spec.SparkJob != nil { + desired.TypeJob = &pb.Job_SparkJob{SparkJob: desiredProto.GetSparkJob()} + } else if obj.Spec.PysparkJob != nil { + desired.TypeJob = &pb.Job_PysparkJob{PysparkJob: desiredProto.GetPysparkJob()} + } else if obj.Spec.HiveJob != nil { + desired.TypeJob = &pb.Job_HiveJob{HiveJob: desiredProto.GetHiveJob()} + } else if obj.Spec.PigJob != nil { + desired.TypeJob = &pb.Job_PigJob{PigJob: desiredProto.GetPigJob()} + } else if obj.Spec.SparkRJob != nil { + desired.TypeJob = &pb.Job_SparkRJob{SparkRJob: desiredProto.GetSparkRJob()} + } else if obj.Spec.SparkSQLJob != nil { + desired.TypeJob = &pb.Job_SparkSqlJob{SparkSqlJob: desiredProto.GetSparkSqlJob()} + } else if obj.Spec.PrestoJob != nil { + desired.TypeJob = &pb.Job_PrestoJob{PrestoJob: desiredProto.GetPrestoJob()} + } else if obj.Spec.TrinoJob != nil { + desired.TypeJob = &pb.Job_TrinoJob{TrinoJob: desiredProto.GetTrinoJob()} + } else { + return nil, fmt.Errorf("unknown job type specified") + } + + gcpClient, err := m.client(ctx) + if err != nil { + return nil, err + } + + return &dataprocJobAdapter{ + gcpClient: gcpClient, + id: jobID, + desired: desired, + generatedId: direct.LazyPtr(jobID.ID()), + }, nil +} + +func (m *dataprocJobModel) AdapterForURL(ctx context.Context, url string) (directbase.Adapter, error) { + // TODO: Support URLs + return nil, nil +} + +var _ directbase.Adapter = &dataprocJobAdapter{} + +type dataprocJobAdapter struct { + gcpClient *dataproc.JobControllerClient + id *krm.JobIdentity // Contains ProjectID, Region, and potentially JobID (after creation or from URL) + desired *pb.Job // Desired state constructed from KRM spec + actual *pb.Job // Actual state fetched from GCP + generatedId *string +} + +func (a *dataprocJobAdapter) Find(ctx context.Context) (bool, error) { + if a.id.ID() == "" { + // If JobID is not known (e.g., before creation), we can't GET the job. + // However, we might try to find it using a list call with a filter if + // the user provided a stable ID via placement.jobId or resourceID annotation. + // For now, assume we can only find by service-generated ID. + // TODO(kcc): Implement List+Filter based on user-provided ID if available. + klog.V(2).Infof("cannot find dataproc job without a job ID (project %s, region %s)", a.id.Parent().ProjectID, a.id.Parent().Location) + return false, nil + } + + req := &pb.GetJobRequest{ + ProjectId: a.id.Parent().ProjectID, + Region: a.id.Parent().Location, + JobId: direct.ValueOf(a.generatedId), + } + + klog.V(2).Infof("getting dataproc job %q", a.id) + actual, err := a.gcpClient.GetJob(ctx, req) + if err != nil { + if direct.IsNotFound(err) { + klog.V(2).Infof("dataproc job %q not found", a.id) + return false, nil + } + return false, fmt.Errorf("getting dataproc job %q: %w", a.id, err) + } + klog.V(2).Infof("found dataproc job %q", a.id) + + a.actual = actual + + // Check if the job is terminated (completed, cancelled, errored). + // Once terminated, it's effectively immutable and shouldn't be reconciled further. + // We treat it as "found" but signal it shouldn't be updated/deleted by KCC if it's done. + // KCC deletion should still work if the user deletes the KRM object. + // if a.actual.Status != nil && (a.actual.Status.State == pb.JobStatus_DONE || a.actual.Status.State == pb.JobStatus_CANCELLED || a.actual.Status.State == pb.JobStatus_ERROR) { + // klog.Infof("Dataproc job %s/%s/%s is already in a terminal state (%s).", a.id.ProjectID, a.id.Region, a.id.JobID, a.actual.Status.State) + // // TODO: How to signal this state back to the controller logic? Maybe via status? + // } + + return true, nil +} + +func (a *dataprocJobAdapter) Create(ctx context.Context, createOp *directbase.CreateOperation) error { + klog.V(2).Infof("creating dataproc job in project %s region %s", a.id.Parent().ProjectID, a.id.Parent().Location) + + // Set the user-specified ID if provided in the spec. + if a.desired.GetPlacement() != nil && a.desired.GetPlacement().GetClusterUuid() != "" { + // Placement.JobUuid is deprecated. Prefer setting reference.job_id + } else { + // If no specific job ID is in the spec, use the KRM resource name/resourceID + // as the reference ID for submission. This makes jobs findable later if the + // service-generated UUID is lost. + if a.desired.Reference == nil { + a.desired.Reference = &pb.JobReference{} + } + } + a.desired.Reference.JobId = a.id.ID() + if a.desired.Labels == nil { + a.desired.Labels = make(map[string]string) + } + a.desired.Labels["managed-by-cnrm"] = "true" + + // Ensure placement is non-nil if it wasn't set, as it's often needed implicitly + // even if empty (e.g., for cluster selectors). The mapping function should handle this. + + req := &pb.SubmitJobRequest{ + ProjectId: a.id.Parent().ProjectID, + Region: a.id.Parent().Location, + Job: a.desired, + } + + submittedJob, err := a.gcpClient.SubmitJob(ctx, req) + if err != nil { + return fmt.Errorf("submitting dataproc job in %s/%s: %w", a.id.Parent().ProjectID, a.id.Parent().Location, err) + } + klog.V(2).Infof("successfully submitted dataproc job %s/%s, service generated ID: %s", a.id.Parent().ProjectID, a.id.Parent().Location, submittedJob.GetReference().GetJobId()) + + a.actual = submittedJob // Store the immediate result as 'actual' state + + // Map the *returned* job state to the KRM status + mapCtx := &direct.MapContext{} + status := &krm.DataprocJobStatus{} + status.ObservedState = DataprocJobObservedState_FromProto(mapCtx, submittedJob) + if mapCtx.Err() != nil { + return mapCtx.Err() + } + + // ExternalRef needs the service-generated ID + status.ExternalRef = direct.PtrTo(fmt.Sprintf("projects/%s/regions/%s/jobs/%s", a.id.Parent().ProjectID, a.id.Parent().Location, a.id.ID())) + a.generatedId = direct.LazyPtr(submittedJob.JobUuid) + if mapCtx.Err() != nil { + return mapCtx.Err() + } + + return createOp.UpdateStatus(ctx, status, nil) +} + +func (a *dataprocJobAdapter) Update(ctx context.Context, updateOp *directbase.UpdateOperation) error { + // Dataproc jobs are generally immutable after submission, except for labels. + // Ref: https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs/patch + // We prevent updates to avoid unexpected behavior or errors. + klog.Warningf("dataproc job %q: updates are not supported", a.id) + + // Check if only labels changed + onlyLabelsChanged := true + if a.actual == nil { + // Should not happen in Update path, but defensively return error + return fmt.Errorf("actual state is nil during update for job %q", a.id) + } + + // Create copies for comparison, normalizing potential output-only fields + desiredComparable := proto.Clone(a.desired).(*pb.Job) + actualComparable := proto.Clone(a.actual).(*pb.Job) + + // Clear fields that are not part of the spec or are output-only before comparison + clearOutputOnlyJobFields(desiredComparable) + clearOutputOnlyJobFields(actualComparable) + + // Compare everything except labels + desiredLabels := desiredComparable.Labels + actualLabels := actualComparable.Labels + desiredComparable.Labels = nil + actualComparable.Labels = nil + + if !proto.Equal(desiredComparable, actualComparable) { + onlyLabelsChanged = false + } + + // Restore labels for potential label update + desiredComparable.Labels = desiredLabels + actualComparable.Labels = actualLabels + + if !onlyLabelsChanged { + klog.Warningf("Dataproc Job can only update labels.") + // We still need to update the status with the observed state from Find() + status := &krm.DataprocJobStatus{} + mapCtx := &direct.MapContext{} + status.ObservedState = DataprocJobObservedState_FromProto(mapCtx, a.actual) + status.ExternalRef = direct.PtrTo(fmt.Sprintf("projects/%s/regions/%s/jobs/%s", a.id.Parent().ProjectID, a.id.Parent().Location, a.id.ID())) + if mapCtx.Err() != nil { + return mapCtx.Err() + } + return updateOp.UpdateStatus(ctx, status, nil) + } + + // If only labels changed, proceed with label update + if !MapsEqual(a.desired.Labels, a.actual.Labels) { + klog.V(2).Infof("updating labels for dataproc job %q", a.id) + req := &pb.UpdateJobRequest{ + ProjectId: a.id.Parent().ProjectID, + Region: a.id.Parent().Location, + JobId: a.id.ID(), + Job: &pb.Job{ + Labels: a.desired.Labels, + }, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"labels"}}, + } + updatedJob, err := a.gcpClient.UpdateJob(ctx, req) + if err != nil { + return fmt.Errorf("updating labels for dataproc job %q: %w", a.id, err) + } + klog.V(2).Infof("successfully updated labels for dataproc job %q", a.id) + a.actual = updatedJob // Update actual state + + // Map the updated job state to the KRM status + mapCtx := &direct.MapContext{} + status := &krm.DataprocJobStatus{} + status.ObservedState = DataprocJobObservedState_FromProto(mapCtx, updatedJob) + status.ExternalRef = direct.PtrTo(fmt.Sprintf("projects/%s/regions/%s/jobs/%s", a.id.Parent().ProjectID, a.id.Parent().Location, a.id.ID())) + if mapCtx.Err() != nil { + return mapCtx.Err() + } + return updateOp.UpdateStatus(ctx, status, nil) + } + + klog.V(2).Infof("no update needed for dataproc job %q", a.id) + return nil // No changes detected +} + +// clearOutputOnlyJobFields removes fields from a Job proto that are output-only +// or managed by the service, so they don't interfere with comparisons. +func clearOutputOnlyJobFields(job *pb.Job) { + if job == nil { + return + } + job.Status = nil + job.StatusHistory = nil + job.DriverOutputResourceUri = "" + job.DriverControlFilesUri = "" + job.JobUuid = "" // Service-generated UUID + job.Reference = nil // Contains service-generated ID + job.YarnApplications = nil // Runtime info + job.Done = false // Runtime status + // Placement might contain output info like cluster UUID if not specified by name + if job.Placement != nil { + job.Placement.ClusterUuid = "" + } +} + +func (a *dataprocJobAdapter) Export(ctx context.Context) (*unstructured.Unstructured, error) { + if a.actual == nil { + return nil, fmt.Errorf("actual state is nil, cannot export") + } + + mapCtx := &direct.MapContext{} + spec := DataprocJobSpec_FromProto(mapCtx, a.actual) + if mapCtx.Err() != nil { + return nil, mapCtx.Err() + } + + obj := &krm.DataprocJob{ + Spec: *spec, + } + obj.Spec.ProjectRef = &refs.ProjectRef{External: a.id.Parent().ProjectID} + obj.Spec.Location = a.id.Parent().Location + + // Set the correct job type wrapper in the spec + switch jobType := a.actual.TypeJob.(type) { + case *pb.Job_HadoopJob: + obj.Spec.HadoopJob = spec.HadoopJob + case *pb.Job_SparkJob: + obj.Spec.SparkJob = spec.SparkJob + case *pb.Job_PysparkJob: + obj.Spec.PysparkJob = spec.PysparkJob + case *pb.Job_HiveJob: + obj.Spec.HiveJob = spec.HiveJob + case *pb.Job_PigJob: + obj.Spec.PigJob = spec.PigJob + case *pb.Job_SparkRJob: + obj.Spec.SparkRJob = spec.SparkRJob + case *pb.Job_SparkSqlJob: + obj.Spec.SparkSQLJob = spec.SparkSQLJob + case *pb.Job_PrestoJob: + obj.Spec.PrestoJob = spec.PrestoJob + case *pb.Job_TrinoJob: + obj.Spec.TrinoJob = spec.TrinoJob + default: + // This should ideally not happen if the job was fetched correctly + klog.Warningf("unknown job type found during export: %T", jobType) + } + obj.Status = krm.DataprocJobStatus{} + obj.Status.ObservedState = DataprocJobObservedState_FromProto(mapCtx, a.actual) + + uObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) + if err != nil { + return nil, fmt.Errorf("converting DataprocJob to unstructured: %w", err) + } + + u := &unstructured.Unstructured{Object: uObj} + u.SetGroupVersionKind(krm.DataprocJobGVK) + + // Use the original reference job ID (if provided) or the service-generated ID as the name. + // Fallback to a generated name if needed, though unlikely. + name := a.actual.GetReference().GetJobId() + if name == "" { + // This should not happen for a fetched job + name = "dataproc-job-" + a.id.ID() + } + u.SetName(name) + + klog.V(2).Infof("exported dataproc job %q as KRM object", a.id) + return u, nil +} + +func (a *dataprocJobAdapter) Delete(ctx context.Context, deleteOp *directbase.DeleteOperation) (bool, error) { + // Dataproc jobs can be cancelled, which stops them. Deleting removes the record. + // We will perform DeleteJob. If the job is terminal, Delete might be idempotent or unnecessary. + // If the job is active, cancellation might be preferred, but KCC delete means remove the resource. + if a.id.ID() == "" { + // Cannot delete if we don't have the ID. This might mean the job was never created successfully. + klog.Warningf("cannot delete dataproc job in %s/%s, job ID unknown", a.id.Parent().ProjectID, a.id.Parent().Location) + return true, nil // Assume already gone + } + + // Check if the job is already in a terminal state. If so, Delete might already be a no-op or unnecessary. + // However, the API might still allow deleting the record. We proceed with delete regardless. + if a.actual != nil && a.actual.Status != nil { + state := a.actual.Status.State + if state == pb.JobStatus_DONE || state == pb.JobStatus_CANCELLED || state == pb.JobStatus_ERROR { + klog.V(2).Infof("dataproc job %q is already in terminal state %s, proceeding with deletion", a.id, state) + } + } + + klog.V(2).Infof("deleting dataproc job %q", a.id) + req := &pb.DeleteJobRequest{ + ProjectId: a.id.Parent().ProjectID, + Region: a.id.Parent().Location, + JobId: direct.ValueOf(a.generatedId), + } + + err := a.gcpClient.DeleteJob(ctx, req) + if err != nil { + if direct.IsNotFound(err) { + klog.V(2).Infof("dataproc job %q already deleted", a.id) + return true, nil // Already deleted + } + // Check for specific errors, e.g., if cancellation is needed first? + // The API doc doesn't specify preconditions like needing to cancel first. + return false, fmt.Errorf("deleting dataproc job %q: %w", a.id, err) + } + + klog.V(2).Infof("successfully deleted dataproc job %q", a.id) + return true, nil +} diff --git a/pkg/controller/direct/dataproc/utils.go b/pkg/controller/direct/dataproc/utils.go new file mode 100644 index 00000000000..1583f8b8e84 --- /dev/null +++ b/pkg/controller/direct/dataproc/utils.go @@ -0,0 +1,27 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dataproc + +func MapsEqual(desired map[string]string, actual map[string]string) bool { + if len(desired) != len(actual) { + return false + } + for k, v := range actual { + if desired[k] != v { + return false + } + } + return true +} diff --git a/pkg/controller/direct/redis/cluster/cluster_fuzzer.go b/pkg/controller/direct/redis/cluster/cluster_fuzzer.go index 1205e9409ba..abcd802ed2d 100644 --- a/pkg/controller/direct/redis/cluster/cluster_fuzzer.go +++ b/pkg/controller/direct/redis/cluster/cluster_fuzzer.go @@ -34,6 +34,21 @@ func redisClusterFuzzer() fuzztesting.KRMFuzzer { ) f.UnimplementedFields.Insert(".name") // Identifier + f.UnimplementedFields.Insert(".backup_collection") + f.UnimplementedFields.Insert(".managed_backup_source") + f.UnimplementedFields.Insert(".psc_service_attachments") + f.UnimplementedFields.Insert(".psc_connections[].psc_connection_status") + f.UnimplementedFields.Insert(".psc_connections[].service_attachment") + f.UnimplementedFields.Insert(".psc_connections[].connection_type") + f.UnimplementedFields.Insert(".cross_cluster_replication_config") + f.UnimplementedFields.Insert(".kms_key") + f.UnimplementedFields.Insert(".maintenance_policy") + f.UnimplementedFields.Insert(".maintenance_schedule") + f.UnimplementedFields.Insert(".automated_backup_config") + f.UnimplementedFields.Insert(".encryption_info") + f.UnimplementedFields.Insert(".gcs_source") + f.UnimplementedFields.Insert(".cluster_endpoints") + f.UnimplementedFields.Insert(".labels") // New Fields in the updated version of Redis Cluster f.UnimplementedFields.Insert(".backup_collection") diff --git a/pkg/test/resourcefixture/testdata/basic/dataproc/v1alpha1/dataprocjob/dataprocjob-minimal/_generated_object_dataprocjob-minimal.golden.yaml b/pkg/test/resourcefixture/testdata/basic/dataproc/v1alpha1/dataprocjob/dataprocjob-minimal/_generated_object_dataprocjob-minimal.golden.yaml new file mode 100644 index 00000000000..8a4db78bb16 --- /dev/null +++ b/pkg/test/resourcefixture/testdata/basic/dataproc/v1alpha1/dataprocjob/dataprocjob-minimal/_generated_object_dataprocjob-minimal.golden.yaml @@ -0,0 +1,45 @@ +apiVersion: dataproc.cnrm.cloud.google.com/v1alpha1 +kind: DataprocJob +metadata: + finalizers: + - cnrm.cloud.google.com/finalizer + - cnrm.cloud.google.com/deletion-defender + generation: 1 + labels: + cnrm-test: "true" + name: dataprocjob-minimal-${uniqueId} + namespace: ${uniqueId} +spec: + hiveJob: + continueOnFailure: true + queryFileURI: gs://config-connector-sample/hivejob_sample.sql + parent: + location: us-central1 + projectRef: + external: ${projectId} + placement: + clusterName: dataproccluster-${uniqueId} +status: + conditions: + - lastTransitionTime: "1970-01-01T00:00:00Z" + message: The resource is up to date + reason: UpToDate + status: "True" + type: Ready + externalRef: projects/${projectId}/regions/us-central1/jobs/dataprocjob-minimal-${uniqueId} + observedGeneration: 1 + observedState: + driverControlFilesURI: gs://dataproc-staging-us-central1-${projectNumber}-abcdef/google-cloud-dataproc-metainfo/dataproccluster-${uniqueId}/jobs/dataprocjob-minimal-${uniqueId}/ + driverOutputResourceURI: gs://dataproc-staging-us-central1-${projectNumber}-abcdef/google-cloud-dataproc-metainfo/dataproccluster-${uniqueId}/jobs/dataprocjob-minimal-${uniqueId}/driveroutput + placement: {} + status: + state: DONE + stateStartTime: "2024-04-01T12:34:56.123456Z" + statusHistory: + - state: PENDING + stateStartTime: "2024-04-01T12:34:56.123456Z" + - state: SETUP_DONE + stateStartTime: "2024-04-01T12:34:56.123456Z" + - details: Agent reported job success + state: RUNNING + stateStartTime: "2024-04-01T12:34:56.123456Z" diff --git a/pkg/test/resourcefixture/testdata/basic/dataproc/v1alpha1/dataprocjob/dataprocjob-minimal/_http.log b/pkg/test/resourcefixture/testdata/basic/dataproc/v1alpha1/dataprocjob/dataprocjob-minimal/_http.log new file mode 100644 index 00000000000..9b8a118b5b3 --- /dev/null +++ b/pkg/test/resourcefixture/testdata/basic/dataproc/v1alpha1/dataprocjob/dataprocjob-minimal/_http.log @@ -0,0 +1,839 @@ +GET https://dataproc.googleapis.com/v1/projects/${projectId}/regions/us-central1/clusters/dataproccluster-${uniqueId}?alt=json +Content-Type: application/json +User-Agent: kcc/${kccVersion} (+https://github.com/GoogleCloudPlatform/k8s-config-connector) kcc/controller-manager/${kccVersion} DeclarativeClientLib/0.0.1 + +404 Not Found +Content-Type: application/json; charset=UTF-8 +Server: ESF +Vary: Origin +Vary: X-Origin +Vary: Referer +X-Content-Type-Options: nosniff +X-Frame-Options: SAMEORIGIN +X-Xss-Protection: 0 + +{ + "error": { + "code": 404, + "errors": [ + { + "domain": "global", + "message": "cluster \"projects/${projectId}/regions/us-central1/clusters/dataproccluster-${uniqueId}\" not found", + "reason": "notFound" + } + ], + "message": "cluster \"projects/${projectId}/regions/us-central1/clusters/dataproccluster-${uniqueId}\" not found", + "status": "NOT_FOUND" + } +} + +--- + +POST https://dataproc.googleapis.com/v1/projects/${projectId}/regions/us-central1/clusters?alt=json +Content-Type: application/json +User-Agent: kcc/${kccVersion} (+https://github.com/GoogleCloudPlatform/k8s-config-connector) kcc/controller-manager/${kccVersion} DeclarativeClientLib/0.0.1 + +{ + "clusterName": "dataproccluster-${uniqueId}", + "config": { + "gceClusterConfig": { + "internalIpOnly": true + }, + "masterConfig": { + "diskConfig": { + "bootDiskType": "pd-standard" + }, + "machineTypeUri": "n2-standard-4", + "numInstances": 1 + } + }, + "labels": { + "cnrm-test": "true", + "managed-by-cnrm": "true" + } +} + +200 OK +Content-Type: application/json; charset=UTF-8 +Server: ESF +Vary: Origin +Vary: X-Origin +Vary: Referer +X-Content-Type-Options: nosniff +X-Frame-Options: SAMEORIGIN +X-Xss-Protection: 0 + +{ + "metadata": { + "@type": "type.googleapis.com/google.cloud.dataproc.v1.ClusterOperationMetadata", + "clusterName": "dataproccluster-${uniqueId}", + "clusterUuid": "${dataStoreClusterUUID}", + "description": "Create cluster with 2 workers", + "operationType": "CREATE", + "status": { + "innerState": "PENDING", + "state": "PENDING", + "stateStartTime": "2024-04-01T12:34:56.123456Z" + }, + "warnings": [ + "The firewall rules for specified network or subnetwork would allow ingress traffic from 0.0.0.0/0, which could be a security risk.", + "The specified custom staging bucket '${dataStoreConfigBucketPath}' is not using uniform bucket level access IAM configuration. It is recommended to update bucket to enable the same. See https://cloud.google.com/storage/docs/uniform-bucket-level-access.", + "No image specified. Using the default image version. It is recommended to select a specific image version in production, as the default image version may change at any time." + ] + }, + "name": "projects/${projectId}/regions/us-central1/operations/${operationID}" +} + +--- + +GET https://dataproc.googleapis.com/v1/projects/${projectId}/regions/us-central1/operations/${operationID}?alt=json +Content-Type: application/json +User-Agent: kcc/${kccVersion} (+https://github.com/GoogleCloudPlatform/k8s-config-connector) kcc/controller-manager/${kccVersion} DeclarativeClientLib/0.0.1 + +200 OK +Content-Type: application/json; charset=UTF-8 +Server: ESF +Vary: Origin +Vary: X-Origin +Vary: Referer +X-Content-Type-Options: nosniff +X-Frame-Options: SAMEORIGIN +X-Xss-Protection: 0 + +{ + "done": true, + "metadata": { + "@type": "type.googleapis.com/google.cloud.dataproc.v1.ClusterOperationMetadata", + "clusterName": "dataproccluster-${uniqueId}", + "clusterUuid": "${dataStoreClusterUUID}", + "description": "Create cluster with 2 workers", + "operationType": "CREATE", + "status": { + "innerState": "DONE", + "state": "DONE", + "stateStartTime": "2024-04-01T12:34:56.123456Z" + }, + "statusHistory": [ + { + "state": "PENDING", + "stateStartTime": "2024-04-01T12:34:56.123456Z" + }, + { + "state": "RUNNING", + "stateStartTime": "2024-04-01T12:34:56.123456Z" + } + ], + "warnings": [ + "The firewall rules for specified network or subnetwork would allow ingress traffic from 0.0.0.0/0, which could be a security risk.", + "The specified custom staging bucket '${dataStoreConfigBucketPath}' is not using uniform bucket level access IAM configuration. It is recommended to update bucket to enable the same. See https://cloud.google.com/storage/docs/uniform-bucket-level-access.", + "No image specified. Using the default image version. It is recommended to select a specific image version in production, as the default image version may change at any time." + ] + }, + "name": "projects/${projectId}/regions/us-central1/operations/${operationID}", + "response": { + "@type": "type.googleapis.com/google.cloud.dataproc.v1.Cluster", + "clusterName": "dataproccluster-${uniqueId}", + "clusterUuid": "${dataStoreClusterUUID}", + "config": { + "configBucket": "${dataStoreConfigBucketPath}", + "endpointConfig": {}, + "gceClusterConfig": { + "internalIpOnly": true, + "networkUri": "https://www.googleapis.com/compute/v1/projects/${projectId}/global/networks/default", + "serviceAccountScopes": [ + "https://www.googleapis.com/auth/cloud-platform" + ], + "shieldedInstanceConfig": { + "enableIntegrityMonitoring": true, + "enableSecureBoot": true, + "enableVtpm": true + }, + "zoneUri": "https://www.googleapis.com/compute/v1/projects/${projectId}/zones/us-central1-c" + }, + "masterConfig": { + "diskConfig": { + "bootDiskSizeGb": 1000, + "bootDiskType": "pd-standard" + }, + "imageUri": "https://www.googleapis.com/compute/v1/projects/cloud-dataproc/global/images/dataproc-2-2-deb12-20250212-155100-rc01", + "machineTypeUri": "https://www.googleapis.com/compute/v1/projects/${projectId}/zones/us-central1-c/machineTypes/n2-standard-4", + "minCpuPlatform": "AUTOMATIC", + "numInstances": 1, + "preemptibility": "NON_PREEMPTIBLE" + }, + "softwareConfig": { + "imageVersion": "2.2.47-debian12", + "properties": { + "capacity-scheduler:yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator", + "capacity-scheduler:yarn.scheduler.capacity.root.default.ordering-policy": "fair", + "core:fs.gs.block.size": "134217728", + "core:fs.gs.metadata.cache.enable": "false", + "core:hadoop.ssl.enabled.protocols": "TLSv1,TLSv1.1,TLSv1.2", + "distcp:mapreduce.map.java.opts": "-Xmx768m", + "distcp:mapreduce.map.memory.mb": "1024", + "distcp:mapreduce.reduce.java.opts": "-Xmx768m", + "distcp:mapreduce.reduce.memory.mb": "1024", + "hadoop-env:HADOOP_DATANODE_OPTS": "-Xmx512m", + "hdfs:dfs.datanode.address": "0.0.0.0:9866", + "hdfs:dfs.datanode.http.address": "0.0.0.0:9864", + "hdfs:dfs.datanode.https.address": "0.0.0.0:9865", + "hdfs:dfs.datanode.ipc.address": "0.0.0.0:9867", + "hdfs:dfs.namenode.handler.count": "20", + "hdfs:dfs.namenode.http-address": "0.0.0.0:9870", + "hdfs:dfs.namenode.https-address": "0.0.0.0:9871", + "hdfs:dfs.namenode.lifeline.rpc-address": "test-${uniqueId}-m:8050", + "hdfs:dfs.namenode.secondary.http-address": "0.0.0.0:9868", + "hdfs:dfs.namenode.secondary.https-address": "0.0.0.0:9869", + "hdfs:dfs.namenode.service.handler.count": "10", + "hdfs:dfs.namenode.servicerpc-address": "test-${uniqueId}-m:8051", + "mapred-env:HADOOP_JOB_HISTORYSERVER_HEAPSIZE": "4000", + "mapred:mapreduce.job.maps": "21", + "mapred:mapreduce.job.reduce.slowstart.completedmaps": "0.95", + "mapred:mapreduce.job.reduces": "7", + "mapred:mapreduce.jobhistory.recovery.store.class": "org.apache.hadoop.mapreduce.v2.hs.HistoryServerLeveldbStateStoreService", + "mapred:mapreduce.map.cpu.vcores": "1", + "mapred:mapreduce.map.java.opts": "-Xmx2708m", + "mapred:mapreduce.map.memory.mb": "3386", + "mapred:mapreduce.reduce.cpu.vcores": "1", + "mapred:mapreduce.reduce.java.opts": "-Xmx2708m", + "mapred:mapreduce.reduce.memory.mb": "3386", + "mapred:mapreduce.task.io.sort.mb": "256", + "mapred:yarn.app.mapreduce.am.command-opts": "-Xmx2708m", + "mapred:yarn.app.mapreduce.am.resource.cpu-vcores": "1", + "mapred:yarn.app.mapreduce.am.resource.mb": "3386", + "spark-env:SPARK_DAEMON_MEMORY": "4000m", + "spark:spark.driver.maxResultSize": "2048m", + "spark:spark.driver.memory": "4096m", + "spark:spark.executor.cores": "2", + "spark:spark.executor.instances": "2", + "spark:spark.executor.memory": "6157m", + "spark:spark.executorEnv.OPENBLAS_NUM_THREADS": "1", + "spark:spark.plugins.defaultList": "com.google.cloud.dataproc.DataprocSparkPlugin", + "spark:spark.scheduler.mode": "FAIR", + "spark:spark.sql.cbo.enabled": "true", + "spark:spark.sql.optimizer.runtime.bloomFilter.join.pattern.enabled": "true", + "spark:spark.ui.port": "0", + "spark:spark.yarn.am.memory": "640m", + "yarn-env:YARN_NODEMANAGER_HEAPSIZE": "1638", + "yarn-env:YARN_RESOURCEMANAGER_HEAPSIZE": "4000", + "yarn-env:YARN_TIMELINESERVER_HEAPSIZE": "4000", + "yarn:yarn.nodemanager.address": "0.0.0.0:8026", + "yarn:yarn.nodemanager.resource.cpu-vcores": "4", + "yarn:yarn.nodemanager.resource.memory-mb": "13544", + "yarn:yarn.resourcemanager.decommissioning-nodes-watcher.decommission-if-no-shuffle-data": "true", + "yarn:yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs": "86400", + "yarn:yarn.scheduler.maximum-allocation-mb": "13544", + "yarn:yarn.scheduler.minimum-allocation-mb": "1" + } + }, + "tempBucket": "${dataStoreTempBucketPath}", + "workerConfig": { + "diskConfig": { + "bootDiskSizeGb": 1000, + "bootDiskType": "pd-standard" + }, + "imageUri": "https://www.googleapis.com/compute/v1/projects/cloud-dataproc/global/images/dataproc-2-2-deb12-20250212-155100-rc01", + "machineTypeUri": "https://www.googleapis.com/compute/v1/projects/${projectId}/zones/us-central1-c/machineTypes/n2-standard-4", + "minCpuPlatform": "AUTOMATIC", + "numInstances": 2, + "preemptibility": "NON_PREEMPTIBLE" + } + }, + "labels": { + "cnrm-test": "true", + "goog-dataproc-autozone": "enabled", + "goog-dataproc-cluster-name": "dataproccluster-${uniqueId}", + "goog-dataproc-cluster-uuid": "${dataStoreClusterUUID}", + "goog-dataproc-location": "us-central1", + "goog-drz-dataproc-uuid": "cluster-${dataStoreClusterUUID}", + "managed-by-cnrm": "true" + }, + "projectId": "${projectId}" + } +} + +--- + +GET https://dataproc.googleapis.com/v1/projects/${projectId}/regions/us-central1/clusters/dataproccluster-${uniqueId}?alt=json +Content-Type: application/json +User-Agent: kcc/${kccVersion} (+https://github.com/GoogleCloudPlatform/k8s-config-connector) kcc/controller-manager/${kccVersion} DeclarativeClientLib/0.0.1 + +200 OK +Content-Type: application/json; charset=UTF-8 +Server: ESF +Vary: Origin +Vary: X-Origin +Vary: Referer +X-Content-Type-Options: nosniff +X-Frame-Options: SAMEORIGIN +X-Xss-Protection: 0 + +{ + "clusterName": "dataproccluster-${uniqueId}", + "clusterUuid": "${dataStoreClusterUUID}", + "config": { + "configBucket": "${dataStoreConfigBucketPath}", + "endpointConfig": {}, + "gceClusterConfig": { + "internalIpOnly": true, + "networkUri": "https://www.googleapis.com/compute/v1/projects/${projectId}/global/networks/default", + "serviceAccountScopes": [ + "https://www.googleapis.com/auth/cloud-platform" + ], + "shieldedInstanceConfig": { + "enableIntegrityMonitoring": true, + "enableSecureBoot": true, + "enableVtpm": true + }, + "zoneUri": "https://www.googleapis.com/compute/v1/projects/${projectId}/zones/us-central1-c" + }, + "masterConfig": { + "diskConfig": { + "bootDiskSizeGb": 1000, + "bootDiskType": "pd-standard" + }, + "imageUri": "https://www.googleapis.com/compute/v1/projects/cloud-dataproc/global/images/dataproc-2-2-deb12-20250212-155100-rc01", + "instanceNames": [ + "dataproccluster-${uniqueId}-m" + ], + "machineTypeUri": "https://www.googleapis.com/compute/v1/projects/${projectId}/zones/us-central1-c/machineTypes/n2-standard-4", + "minCpuPlatform": "AUTOMATIC", + "numInstances": 1, + "preemptibility": "NON_PREEMPTIBLE" + }, + "softwareConfig": { + "imageVersion": "2.2.47-debian12", + "properties": { + "capacity-scheduler:yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator", + "capacity-scheduler:yarn.scheduler.capacity.root.default.ordering-policy": "fair", + "core:fs.gs.block.size": "134217728", + "core:fs.gs.metadata.cache.enable": "false", + "core:hadoop.ssl.enabled.protocols": "TLSv1,TLSv1.1,TLSv1.2", + "distcp:mapreduce.map.java.opts": "-Xmx768m", + "distcp:mapreduce.map.memory.mb": "1024", + "distcp:mapreduce.reduce.java.opts": "-Xmx768m", + "distcp:mapreduce.reduce.memory.mb": "1024", + "hadoop-env:HADOOP_DATANODE_OPTS": "-Xmx512m", + "hdfs:dfs.datanode.address": "0.0.0.0:9866", + "hdfs:dfs.datanode.http.address": "0.0.0.0:9864", + "hdfs:dfs.datanode.https.address": "0.0.0.0:9865", + "hdfs:dfs.datanode.ipc.address": "0.0.0.0:9867", + "hdfs:dfs.namenode.handler.count": "20", + "hdfs:dfs.namenode.http-address": "0.0.0.0:9870", + "hdfs:dfs.namenode.https-address": "0.0.0.0:9871", + "hdfs:dfs.namenode.lifeline.rpc-address": "test-${uniqueId}-m:8050", + "hdfs:dfs.namenode.secondary.http-address": "0.0.0.0:9868", + "hdfs:dfs.namenode.secondary.https-address": "0.0.0.0:9869", + "hdfs:dfs.namenode.service.handler.count": "10", + "hdfs:dfs.namenode.servicerpc-address": "test-${uniqueId}-m:8051", + "mapred-env:HADOOP_JOB_HISTORYSERVER_HEAPSIZE": "4000", + "mapred:mapreduce.job.maps": "21", + "mapred:mapreduce.job.reduce.slowstart.completedmaps": "0.95", + "mapred:mapreduce.job.reduces": "7", + "mapred:mapreduce.jobhistory.recovery.store.class": "org.apache.hadoop.mapreduce.v2.hs.HistoryServerLeveldbStateStoreService", + "mapred:mapreduce.map.cpu.vcores": "1", + "mapred:mapreduce.map.java.opts": "-Xmx2708m", + "mapred:mapreduce.map.memory.mb": "3386", + "mapred:mapreduce.reduce.cpu.vcores": "1", + "mapred:mapreduce.reduce.java.opts": "-Xmx2708m", + "mapred:mapreduce.reduce.memory.mb": "3386", + "mapred:mapreduce.task.io.sort.mb": "256", + "mapred:yarn.app.mapreduce.am.command-opts": "-Xmx2708m", + "mapred:yarn.app.mapreduce.am.resource.cpu-vcores": "1", + "mapred:yarn.app.mapreduce.am.resource.mb": "3386", + "spark-env:SPARK_DAEMON_MEMORY": "4000m", + "spark:spark.driver.maxResultSize": "2048m", + "spark:spark.driver.memory": "4096m", + "spark:spark.executor.cores": "2", + "spark:spark.executor.instances": "2", + "spark:spark.executor.memory": "6157m", + "spark:spark.executorEnv.OPENBLAS_NUM_THREADS": "1", + "spark:spark.plugins.defaultList": "com.google.cloud.dataproc.DataprocSparkPlugin", + "spark:spark.scheduler.mode": "FAIR", + "spark:spark.sql.cbo.enabled": "true", + "spark:spark.sql.optimizer.runtime.bloomFilter.join.pattern.enabled": "true", + "spark:spark.ui.port": "0", + "spark:spark.yarn.am.memory": "640m", + "yarn-env:YARN_NODEMANAGER_HEAPSIZE": "1638", + "yarn-env:YARN_RESOURCEMANAGER_HEAPSIZE": "4000", + "yarn-env:YARN_TIMELINESERVER_HEAPSIZE": "4000", + "yarn:yarn.nodemanager.address": "0.0.0.0:8026", + "yarn:yarn.nodemanager.resource.cpu-vcores": "4", + "yarn:yarn.nodemanager.resource.memory-mb": "13544", + "yarn:yarn.resourcemanager.decommissioning-nodes-watcher.decommission-if-no-shuffle-data": "true", + "yarn:yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs": "86400", + "yarn:yarn.scheduler.maximum-allocation-mb": "13544", + "yarn:yarn.scheduler.minimum-allocation-mb": "1" + } + }, + "tempBucket": "${dataStoreTempBucketPath}", + "workerConfig": { + "diskConfig": { + "bootDiskSizeGb": 1000, + "bootDiskType": "pd-standard" + }, + "imageUri": "https://www.googleapis.com/compute/v1/projects/cloud-dataproc/global/images/dataproc-2-2-deb12-20250212-155100-rc01", + "instanceNames": [ + "dataproccluster-${uniqueId}-w-0", + "dataproccluster-${uniqueId}-w-1" + ], + "machineTypeUri": "https://www.googleapis.com/compute/v1/projects/${projectId}/zones/us-central1-c/machineTypes/n2-standard-4", + "minCpuPlatform": "AUTOMATIC", + "numInstances": 2, + "preemptibility": "NON_PREEMPTIBLE" + } + }, + "labels": { + "cnrm-test": "true", + "goog-dataproc-autozone": "enabled", + "goog-dataproc-cluster-name": "dataproccluster-${uniqueId}", + "goog-dataproc-cluster-uuid": "${dataStoreClusterUUID}", + "goog-dataproc-location": "us-central1", + "goog-drz-dataproc-uuid": "cluster-${dataStoreClusterUUID}", + "managed-by-cnrm": "true" + }, + "projectId": "${projectId}", + "status": { + "state": "RUNNING", + "stateStartTime": "2024-04-01T12:34:56.123456Z" + }, + "statusHistory": [ + { + "state": "CREATING", + "stateStartTime": "2024-04-01T12:34:56.123456Z" + } + ] +} + +--- + +GET https://dataproc.googleapis.com/v1/projects/${projectId}/regions/us-central1/jobs/${jobID}?%24alt=json%3Benum-encoding%3Dint +Content-Type: application/json +User-Agent: kcc/${kccVersion} (+https://github.com/GoogleCloudPlatform/k8s-config-connector) kcc/controller-manager/${kccVersion} +X-Goog-Request-Params: project_id=${projectId}®ion=us-central1&job_id=dataprocjob-minimal-${uniqueId} + +404 Not Found +Content-Type: application/json; charset=UTF-8 +Server: ESF +Vary: Origin +Vary: X-Origin +Vary: Referer +X-Content-Type-Options: nosniff +X-Frame-Options: SAMEORIGIN +X-Xss-Protection: 0 + +{ + "error": { + "code": 404, + "errors": [ + { + "domain": "global", + "message": "job \"projects/${projectId}/regions/us-central1/jobs/dataprocjob-minimal-${uniqueId}\" not found", + "reason": "notFound" + } + ], + "message": "job \"projects/${projectId}/regions/us-central1/jobs/dataprocjob-minimal-${uniqueId}\" not found", + "status": "NOT_FOUND" + } +} + +--- + +POST https://dataproc.googleapis.com/v1/projects/${projectId}/regions/us-central1/jobs:submit?%24alt=json%3Benum-encoding%3Dint +Content-Type: application/json +User-Agent: kcc/${kccVersion} (+https://github.com/GoogleCloudPlatform/k8s-config-connector) kcc/controller-manager/${kccVersion} +X-Goog-Request-Params: project_id=${projectId}®ion=us-central1 + +{ + "job": { + "hiveJob": { + "continueOnFailure": true, + "queryFileUri": "gs://config-connector-sample/hivejob_sample.sql" + }, + "labels": { + "managed-by-cnrm": "true" + }, + "placement": { + "clusterName": "dataproccluster-${uniqueId}" + }, + "reference": { + "jobId": "dataprocjob-minimal-${uniqueId}" + } + }, + "projectId": "${projectId}", + "region": "us-central1" +} + +200 OK +Content-Type: application/json; charset=UTF-8 +Server: ESF +Vary: Origin +Vary: X-Origin +Vary: Referer +X-Content-Type-Options: nosniff +X-Frame-Options: SAMEORIGIN +X-Xss-Protection: 0 + +{ + "driverControlFilesUri": "gs://dataproc-staging-us-central1-${projectNumber}-abcdef/google-cloud-dataproc-metainfo/dataproccluster-${uniqueId}/jobs/dataprocjob-minimal-${uniqueId}/", + "driverOutputResourceUri": "gs://dataproc-staging-us-central1-${projectNumber}-abcdef/google-cloud-dataproc-metainfo/dataproccluster-${uniqueId}/jobs/dataprocjob-minimal-${uniqueId}/driveroutput", + "hiveJob": { + "continueOnFailure": true, + "queryFileUri": "gs://config-connector-sample/hivejob_sample.sql" + }, + "labels": { + "managed-by-cnrm": "true" + }, + "placement": { + "clusterName": "dataproccluster-${uniqueId}" + }, + "reference": { + "jobId": "dataprocjob-minimal-${uniqueId}", + "projectId": "${projectId}" + }, + "status": { + "state": 5, + "stateStartTime": "2024-04-01T12:34:56.123456Z" + }, + "statusHistory": [ + { + "state": 1, + "stateStartTime": "2024-04-01T12:34:56.123456Z" + }, + { + "state": 8, + "stateStartTime": "2024-04-01T12:34:56.123456Z" + }, + { + "details": "Agent reported job success", + "state": 2, + "stateStartTime": "2024-04-01T12:34:56.123456Z" + } + ] +} + +--- + +GET https://dataproc.googleapis.com/v1/projects/${projectId}/regions/us-central1/jobs/${jobID}?%24alt=json%3Benum-encoding%3Dint +Content-Type: application/json +User-Agent: kcc/${kccVersion} (+https://github.com/GoogleCloudPlatform/k8s-config-connector) kcc/controller-manager/${kccVersion} +X-Goog-Request-Params: project_id=${projectId}®ion=us-central1&job_id=dataprocjob-minimal-${uniqueId} + +404 Not Found +Content-Type: application/json; charset=UTF-8 +Server: ESF +Vary: Origin +Vary: X-Origin +Vary: Referer +X-Content-Type-Options: nosniff +X-Frame-Options: SAMEORIGIN +X-Xss-Protection: 0 + +{ + "error": { + "code": 404, + "errors": [ + { + "domain": "global", + "message": "job \"projects/${projectId}/regions/us-central1/jobs/dataprocjob-minimal-${uniqueId}\" not found", + "reason": "notFound" + } + ], + "message": "job \"projects/${projectId}/regions/us-central1/jobs/dataprocjob-minimal-${uniqueId}\" not found", + "status": "NOT_FOUND" + } +} + +--- + +DELETE https://dataproc.googleapis.com/v1/projects/${projectId}/regions/us-central1/jobs/${jobID}?%24alt=json%3Benum-encoding%3Dint +Content-Type: application/json +User-Agent: kcc/${kccVersion} (+https://github.com/GoogleCloudPlatform/k8s-config-connector) kcc/controller-manager/${kccVersion} +X-Goog-Request-Params: project_id=${projectId}®ion=us-central1&job_id=dataprocjob-minimal-${uniqueId} + +404 Not Found +Content-Type: application/json; charset=UTF-8 +Server: ESF +Vary: Origin +Vary: X-Origin +Vary: Referer +X-Content-Type-Options: nosniff +X-Frame-Options: SAMEORIGIN +X-Xss-Protection: 0 + +{ + "error": { + "code": 404, + "errors": [ + { + "domain": "global", + "message": "job \"projects/${projectId}/regions/us-central1/jobs/dataprocjob-minimal-${uniqueId}\" not found", + "reason": "notFound" + } + ], + "message": "job \"projects/${projectId}/regions/us-central1/jobs/dataprocjob-minimal-${uniqueId}\" not found", + "status": "NOT_FOUND" + } +} + +--- + +GET https://dataproc.googleapis.com/v1/projects/${projectId}/regions/us-central1/clusters/dataproccluster-${uniqueId}?alt=json +Content-Type: application/json +User-Agent: kcc/${kccVersion} (+https://github.com/GoogleCloudPlatform/k8s-config-connector) kcc/controller-manager/${kccVersion} DeclarativeClientLib/0.0.1 + +200 OK +Content-Type: application/json; charset=UTF-8 +Server: ESF +Vary: Origin +Vary: X-Origin +Vary: Referer +X-Content-Type-Options: nosniff +X-Frame-Options: SAMEORIGIN +X-Xss-Protection: 0 + +{ + "clusterName": "dataproccluster-${uniqueId}", + "clusterUuid": "${dataStoreClusterUUID}", + "config": { + "configBucket": "${dataStoreConfigBucketPath}", + "endpointConfig": {}, + "gceClusterConfig": { + "internalIpOnly": true, + "networkUri": "https://www.googleapis.com/compute/v1/projects/${projectId}/global/networks/default", + "serviceAccountScopes": [ + "https://www.googleapis.com/auth/cloud-platform" + ], + "shieldedInstanceConfig": { + "enableIntegrityMonitoring": true, + "enableSecureBoot": true, + "enableVtpm": true + }, + "zoneUri": "https://www.googleapis.com/compute/v1/projects/${projectId}/zones/us-central1-c" + }, + "masterConfig": { + "diskConfig": { + "bootDiskSizeGb": 1000, + "bootDiskType": "pd-standard" + }, + "imageUri": "https://www.googleapis.com/compute/v1/projects/cloud-dataproc/global/images/dataproc-2-2-deb12-20250212-155100-rc01", + "instanceNames": [ + "dataproccluster-${uniqueId}-m" + ], + "machineTypeUri": "https://www.googleapis.com/compute/v1/projects/${projectId}/zones/us-central1-c/machineTypes/n2-standard-4", + "minCpuPlatform": "AUTOMATIC", + "numInstances": 1, + "preemptibility": "NON_PREEMPTIBLE" + }, + "softwareConfig": { + "imageVersion": "2.2.47-debian12", + "properties": { + "capacity-scheduler:yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator", + "capacity-scheduler:yarn.scheduler.capacity.root.default.ordering-policy": "fair", + "core:fs.gs.block.size": "134217728", + "core:fs.gs.metadata.cache.enable": "false", + "core:hadoop.ssl.enabled.protocols": "TLSv1,TLSv1.1,TLSv1.2", + "distcp:mapreduce.map.java.opts": "-Xmx768m", + "distcp:mapreduce.map.memory.mb": "1024", + "distcp:mapreduce.reduce.java.opts": "-Xmx768m", + "distcp:mapreduce.reduce.memory.mb": "1024", + "hadoop-env:HADOOP_DATANODE_OPTS": "-Xmx512m", + "hdfs:dfs.datanode.address": "0.0.0.0:9866", + "hdfs:dfs.datanode.http.address": "0.0.0.0:9864", + "hdfs:dfs.datanode.https.address": "0.0.0.0:9865", + "hdfs:dfs.datanode.ipc.address": "0.0.0.0:9867", + "hdfs:dfs.namenode.handler.count": "20", + "hdfs:dfs.namenode.http-address": "0.0.0.0:9870", + "hdfs:dfs.namenode.https-address": "0.0.0.0:9871", + "hdfs:dfs.namenode.lifeline.rpc-address": "test-${uniqueId}-m:8050", + "hdfs:dfs.namenode.secondary.http-address": "0.0.0.0:9868", + "hdfs:dfs.namenode.secondary.https-address": "0.0.0.0:9869", + "hdfs:dfs.namenode.service.handler.count": "10", + "hdfs:dfs.namenode.servicerpc-address": "test-${uniqueId}-m:8051", + "mapred-env:HADOOP_JOB_HISTORYSERVER_HEAPSIZE": "4000", + "mapred:mapreduce.job.maps": "21", + "mapred:mapreduce.job.reduce.slowstart.completedmaps": "0.95", + "mapred:mapreduce.job.reduces": "7", + "mapred:mapreduce.jobhistory.recovery.store.class": "org.apache.hadoop.mapreduce.v2.hs.HistoryServerLeveldbStateStoreService", + "mapred:mapreduce.map.cpu.vcores": "1", + "mapred:mapreduce.map.java.opts": "-Xmx2708m", + "mapred:mapreduce.map.memory.mb": "3386", + "mapred:mapreduce.reduce.cpu.vcores": "1", + "mapred:mapreduce.reduce.java.opts": "-Xmx2708m", + "mapred:mapreduce.reduce.memory.mb": "3386", + "mapred:mapreduce.task.io.sort.mb": "256", + "mapred:yarn.app.mapreduce.am.command-opts": "-Xmx2708m", + "mapred:yarn.app.mapreduce.am.resource.cpu-vcores": "1", + "mapred:yarn.app.mapreduce.am.resource.mb": "3386", + "spark-env:SPARK_DAEMON_MEMORY": "4000m", + "spark:spark.driver.maxResultSize": "2048m", + "spark:spark.driver.memory": "4096m", + "spark:spark.executor.cores": "2", + "spark:spark.executor.instances": "2", + "spark:spark.executor.memory": "6157m", + "spark:spark.executorEnv.OPENBLAS_NUM_THREADS": "1", + "spark:spark.plugins.defaultList": "com.google.cloud.dataproc.DataprocSparkPlugin", + "spark:spark.scheduler.mode": "FAIR", + "spark:spark.sql.cbo.enabled": "true", + "spark:spark.sql.optimizer.runtime.bloomFilter.join.pattern.enabled": "true", + "spark:spark.ui.port": "0", + "spark:spark.yarn.am.memory": "640m", + "yarn-env:YARN_NODEMANAGER_HEAPSIZE": "1638", + "yarn-env:YARN_RESOURCEMANAGER_HEAPSIZE": "4000", + "yarn-env:YARN_TIMELINESERVER_HEAPSIZE": "4000", + "yarn:yarn.nodemanager.address": "0.0.0.0:8026", + "yarn:yarn.nodemanager.resource.cpu-vcores": "4", + "yarn:yarn.nodemanager.resource.memory-mb": "13544", + "yarn:yarn.resourcemanager.decommissioning-nodes-watcher.decommission-if-no-shuffle-data": "true", + "yarn:yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs": "86400", + "yarn:yarn.scheduler.maximum-allocation-mb": "13544", + "yarn:yarn.scheduler.minimum-allocation-mb": "1" + } + }, + "tempBucket": "${dataStoreTempBucketPath}", + "workerConfig": { + "diskConfig": { + "bootDiskSizeGb": 1000, + "bootDiskType": "pd-standard" + }, + "imageUri": "https://www.googleapis.com/compute/v1/projects/cloud-dataproc/global/images/dataproc-2-2-deb12-20250212-155100-rc01", + "instanceNames": [ + "dataproccluster-${uniqueId}-w-0", + "dataproccluster-${uniqueId}-w-1" + ], + "machineTypeUri": "https://www.googleapis.com/compute/v1/projects/${projectId}/zones/us-central1-c/machineTypes/n2-standard-4", + "minCpuPlatform": "AUTOMATIC", + "numInstances": 2, + "preemptibility": "NON_PREEMPTIBLE" + } + }, + "labels": { + "cnrm-test": "true", + "goog-dataproc-autozone": "enabled", + "goog-dataproc-cluster-name": "dataproccluster-${uniqueId}", + "goog-dataproc-cluster-uuid": "${dataStoreClusterUUID}", + "goog-dataproc-location": "us-central1", + "goog-drz-dataproc-uuid": "cluster-${dataStoreClusterUUID}", + "managed-by-cnrm": "true" + }, + "projectId": "${projectId}", + "status": { + "state": "RUNNING", + "stateStartTime": "2024-04-01T12:34:56.123456Z" + }, + "statusHistory": [ + { + "state": "CREATING", + "stateStartTime": "2024-04-01T12:34:56.123456Z" + } + ] +} + +--- + +DELETE https://dataproc.googleapis.com/v1/projects/${projectId}/regions/us-central1/clusters/dataproccluster-${uniqueId}?alt=json +Content-Type: application/json +User-Agent: kcc/${kccVersion} (+https://github.com/GoogleCloudPlatform/k8s-config-connector) kcc/controller-manager/${kccVersion} DeclarativeClientLib/0.0.1 + +200 OK +Content-Type: application/json; charset=UTF-8 +Server: ESF +Vary: Origin +Vary: X-Origin +Vary: Referer +X-Content-Type-Options: nosniff +X-Frame-Options: SAMEORIGIN +X-Xss-Protection: 0 + +{ + "metadata": { + "@type": "type.googleapis.com/google.cloud.dataproc.v1.ClusterOperationMetadata", + "clusterName": "dataproccluster-${uniqueId}", + "clusterUuid": "${dataStoreClusterUUID}", + "description": "Delete cluster", + "operationType": "DELETE", + "status": { + "innerState": "PENDING", + "state": "PENDING", + "stateStartTime": "2024-04-01T12:34:56.123456Z" + } + }, + "name": "projects/${projectId}/regions/us-central1/operations/${operationID}" +} + +--- + +GET https://dataproc.googleapis.com/v1/projects/${projectId}/regions/us-central1/operations/${operationID}?alt=json +Content-Type: application/json +User-Agent: kcc/${kccVersion} (+https://github.com/GoogleCloudPlatform/k8s-config-connector) kcc/controller-manager/${kccVersion} DeclarativeClientLib/0.0.1 + +200 OK +Content-Type: application/json; charset=UTF-8 +Server: ESF +Vary: Origin +Vary: X-Origin +Vary: Referer +X-Content-Type-Options: nosniff +X-Frame-Options: SAMEORIGIN +X-Xss-Protection: 0 + +{ + "done": true, + "metadata": { + "@type": "type.googleapis.com/google.cloud.dataproc.v1.ClusterOperationMetadata", + "clusterName": "dataproccluster-${uniqueId}", + "clusterUuid": "${dataStoreClusterUUID}", + "description": "Delete cluster", + "operationType": "DELETE", + "status": { + "innerState": "DONE", + "state": "DONE", + "stateStartTime": "2024-04-01T12:34:56.123456Z" + }, + "statusHistory": [ + { + "state": "PENDING", + "stateStartTime": "2024-04-01T12:34:56.123456Z" + }, + { + "state": "RUNNING", + "stateStartTime": "2024-04-01T12:34:56.123456Z" + } + ] + }, + "name": "projects/${projectId}/regions/us-central1/operations/${operationID}", + "response": { + "@type": "type.googleapis.com/google.protobuf.Empty" + } +} + +--- + +GET https://dataproc.googleapis.com/v1/projects/${projectId}/regions/us-central1/clusters/dataproccluster-${uniqueId}?alt=json +Content-Type: application/json +User-Agent: kcc/${kccVersion} (+https://github.com/GoogleCloudPlatform/k8s-config-connector) kcc/controller-manager/${kccVersion} DeclarativeClientLib/0.0.1 + +404 Not Found +Content-Type: application/json; charset=UTF-8 +Server: ESF +Vary: Origin +Vary: X-Origin +Vary: Referer +X-Content-Type-Options: nosniff +X-Frame-Options: SAMEORIGIN +X-Xss-Protection: 0 + +{ + "error": { + "code": 404, + "errors": [ + { + "domain": "global", + "message": "cluster \"projects/${projectId}/regions/us-central1/clusters/dataproccluster-${uniqueId}\" not found", + "reason": "notFound" + } + ], + "message": "cluster \"projects/${projectId}/regions/us-central1/clusters/dataproccluster-${uniqueId}\" not found", + "status": "NOT_FOUND" + } +} \ No newline at end of file diff --git a/pkg/test/resourcefixture/testdata/basic/dataproc/v1alpha1/dataprocjob/dataprocjob-minimal/create.yaml b/pkg/test/resourcefixture/testdata/basic/dataproc/v1alpha1/dataprocjob/dataprocjob-minimal/create.yaml new file mode 100644 index 00000000000..2195c08232b --- /dev/null +++ b/pkg/test/resourcefixture/testdata/basic/dataproc/v1alpha1/dataprocjob/dataprocjob-minimal/create.yaml @@ -0,0 +1,28 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: dataproc.cnrm.cloud.google.com/v1alpha1 +kind: DataprocJob +metadata: + name: dataprocjob-minimal-${uniqueId} +spec: + parent: + projectRef: + external: ${projectId} + location: us-central1 + placement: + clusterName: dataproccluster-${uniqueId} + hiveJob: + queryFileURI: "gs://config-connector-sample/hivejob_sample.sql" + continueOnFailure: true \ No newline at end of file diff --git a/pkg/test/resourcefixture/testdata/basic/dataproc/v1alpha1/dataprocjob/dataprocjob-minimal/dependencies.yaml b/pkg/test/resourcefixture/testdata/basic/dataproc/v1alpha1/dataprocjob/dataprocjob-minimal/dependencies.yaml new file mode 100644 index 00000000000..1fdc0860dbd --- /dev/null +++ b/pkg/test/resourcefixture/testdata/basic/dataproc/v1alpha1/dataprocjob/dataprocjob-minimal/dependencies.yaml @@ -0,0 +1,30 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: dataproc.cnrm.cloud.google.com/v1beta1 +kind: DataprocCluster +metadata: + name: dataproccluster-${uniqueId} +spec: + projectRef: + external: projects/${projectId} + location: "us-central1" + config: + gceClusterConfig: + internalIPOnly: true + masterConfig: + diskConfig: + bootDiskType: pd-standard + machineType: "n2-standard-4" + numInstances: 1 \ No newline at end of file diff --git a/pkg/test/resourcefixture/testdata/basic/dataproc/v1alpha1/dataprocjob/dataprocjob-minimal/update.yaml b/pkg/test/resourcefixture/testdata/basic/dataproc/v1alpha1/dataprocjob/dataprocjob-minimal/update.yaml new file mode 100644 index 00000000000..2195c08232b --- /dev/null +++ b/pkg/test/resourcefixture/testdata/basic/dataproc/v1alpha1/dataprocjob/dataprocjob-minimal/update.yaml @@ -0,0 +1,28 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: dataproc.cnrm.cloud.google.com/v1alpha1 +kind: DataprocJob +metadata: + name: dataprocjob-minimal-${uniqueId} +spec: + parent: + projectRef: + external: ${projectId} + location: us-central1 + placement: + clusterName: dataproccluster-${uniqueId} + hiveJob: + queryFileURI: "gs://config-connector-sample/hivejob_sample.sql" + continueOnFailure: true \ No newline at end of file diff --git a/tests/e2e/normalize.go b/tests/e2e/normalize.go index 54c924d1b80..091d7e70d5f 100644 --- a/tests/e2e/normalize.go +++ b/tests/e2e/normalize.go @@ -111,6 +111,8 @@ func normalizeKRMObject(t *testing.T, u *unstructured.Unstructured, project test //Specific to Dataproc visitor.replacePaths[".status.observedState.stateHistory[].stateStartTime"] = "2024-04-01T12:34:56.123456Z" visitor.replacePaths[".status.observedState.stateTime"] = "2024-04-01T12:34:56.123456Z" + visitor.replacePaths[".status.observedState.statusHistory[].stateStartTime"] = "2024-04-01T12:34:56.123456Z" + visitor.replacePaths[".status.observedState.status.stateStartTime"] = "2024-04-01T12:34:56.123456Z" visitor.replacePaths[".status.observedState.outputUri"] = "gs://dataproc-staging-us-central1-${projectNumber}-h/google-cloud-dataproc-metainfo/fffc/jobs/srvls-batch/driveroutput" // Specific to Firestore