Skip to content

Commit 84f4efd

Browse files
committed
address review comments
1 parent b6bc8c3 commit 84f4efd

File tree

2 files changed

+88
-91
lines changed
  • pkg

2 files changed

+88
-91
lines changed

pkg/controller/direct/dataproc/job_controller.go

Lines changed: 88 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -79,19 +79,6 @@ func (m *dataprocJobModel) AdapterForObject(ctx context.Context, reader client.R
7979
return nil, fmt.Errorf("error converting to %T: %w", obj, err)
8080
}
8181

82-
projectRef, err := refs.ResolveProject(ctx, reader, obj.GetNamespace(), obj.Spec.ProjectRef)
83-
if err != nil {
84-
return nil, err
85-
}
86-
projectID := projectRef.ProjectID
87-
if projectID == "" {
88-
return nil, fmt.Errorf("cannot resolve project")
89-
}
90-
location := obj.Spec.Location
91-
if location == "" {
92-
return nil, fmt.Errorf("region is required")
93-
}
94-
9582
// The job ID is part of the job placement config in the proto, not the top-level resource ID.
9683
// The job resource itself uses a UUID generated by the service (`reference.jobId`).
9784
// We construct an internal ID representation that includes the project and location.
@@ -101,44 +88,6 @@ func (m *dataprocJobModel) AdapterForObject(ctx context.Context, reader client.R
10188
return nil, fmt.Errorf("failed to create new Job identity, error: %w", err)
10289
}
10390

104-
mapCtx := &direct.MapContext{}
105-
desiredProto := DataprocJobSpec_ToProto(mapCtx, &obj.Spec)
106-
if mapCtx.Err() != nil {
107-
return nil, mapCtx.Err()
108-
}
109-
// We need the full Job message, not just the spec part.
110-
desired := &pb.Job{
111-
Placement: desiredProto.Placement,
112-
JobUuid: desiredProto.JobUuid,
113-
Scheduling: desiredProto.Scheduling,
114-
YarnApplications: desiredProto.YarnApplications,
115-
DriverSchedulingConfig: desiredProto.DriverSchedulingConfig,
116-
Done: desiredProto.Done,
117-
Labels: desiredProto.Labels,
118-
}
119-
// Populate the job type specific field
120-
if obj.Spec.HadoopJob != nil {
121-
desired.TypeJob = &pb.Job_HadoopJob{HadoopJob: desiredProto.GetHadoopJob()}
122-
} else if obj.Spec.SparkJob != nil {
123-
desired.TypeJob = &pb.Job_SparkJob{SparkJob: desiredProto.GetSparkJob()}
124-
} else if obj.Spec.PysparkJob != nil {
125-
desired.TypeJob = &pb.Job_PysparkJob{PysparkJob: desiredProto.GetPysparkJob()}
126-
} else if obj.Spec.HiveJob != nil {
127-
desired.TypeJob = &pb.Job_HiveJob{HiveJob: desiredProto.GetHiveJob()}
128-
} else if obj.Spec.PigJob != nil {
129-
desired.TypeJob = &pb.Job_PigJob{PigJob: desiredProto.GetPigJob()}
130-
} else if obj.Spec.SparkRJob != nil {
131-
desired.TypeJob = &pb.Job_SparkRJob{SparkRJob: desiredProto.GetSparkRJob()}
132-
} else if obj.Spec.SparkSQLJob != nil {
133-
desired.TypeJob = &pb.Job_SparkSqlJob{SparkSqlJob: desiredProto.GetSparkSqlJob()}
134-
} else if obj.Spec.PrestoJob != nil {
135-
desired.TypeJob = &pb.Job_PrestoJob{PrestoJob: desiredProto.GetPrestoJob()}
136-
} else if obj.Spec.TrinoJob != nil {
137-
desired.TypeJob = &pb.Job_TrinoJob{TrinoJob: desiredProto.GetTrinoJob()}
138-
} else {
139-
return nil, fmt.Errorf("unknown job type specified")
140-
}
141-
14291
gcpClient, err := m.client(ctx)
14392
if err != nil {
14493
return nil, err
@@ -147,7 +96,7 @@ func (m *dataprocJobModel) AdapterForObject(ctx context.Context, reader client.R
14796
return &dataprocJobAdapter{
14897
gcpClient: gcpClient,
14998
id: jobID,
150-
desired: desired,
99+
desired: obj,
151100
generatedId: direct.LazyPtr(jobID.ID()),
152101
}, nil
153102
}
@@ -162,7 +111,7 @@ var _ directbase.Adapter = &dataprocJobAdapter{}
162111
type dataprocJobAdapter struct {
163112
gcpClient *dataproc.JobControllerClient
164113
id *krm.JobIdentity // Contains ProjectID, Region, and potentially JobID (after creation or from URL)
165-
desired *pb.Job // Desired state constructed from KRM spec
114+
desired *krm.DataprocJob // Desired state
166115
actual *pb.Job // Actual state fetched from GCP
167116
generatedId *string
168117
}
@@ -212,30 +161,69 @@ func (a *dataprocJobAdapter) Find(ctx context.Context) (bool, error) {
212161
func (a *dataprocJobAdapter) Create(ctx context.Context, createOp *directbase.CreateOperation) error {
213162
klog.V(2).Infof("creating dataproc job in project %s region %s", a.id.Parent().ProjectID, a.id.Parent().Location)
214163

164+
mapCtx := &direct.MapContext{}
165+
desiredProto := DataprocJobSpec_ToProto(mapCtx, &a.desired.Spec)
166+
if mapCtx.Err() != nil {
167+
return mapCtx.Err()
168+
}
169+
// We need the full Job message, not just the spec part.
170+
desired := &pb.Job{
171+
Placement: desiredProto.Placement,
172+
JobUuid: desiredProto.JobUuid,
173+
Scheduling: desiredProto.Scheduling,
174+
YarnApplications: desiredProto.YarnApplications,
175+
DriverSchedulingConfig: desiredProto.DriverSchedulingConfig,
176+
Done: desiredProto.Done,
177+
Labels: desiredProto.Labels,
178+
}
179+
// Populate the job type specific field
180+
if a.desired.Spec.HadoopJob != nil {
181+
desired.TypeJob = &pb.Job_HadoopJob{HadoopJob: desiredProto.GetHadoopJob()}
182+
} else if a.desired.Spec.SparkJob != nil {
183+
desired.TypeJob = &pb.Job_SparkJob{SparkJob: desiredProto.GetSparkJob()}
184+
} else if a.desired.Spec.PysparkJob != nil {
185+
desired.TypeJob = &pb.Job_PysparkJob{PysparkJob: desiredProto.GetPysparkJob()}
186+
} else if a.desired.Spec.HiveJob != nil {
187+
desired.TypeJob = &pb.Job_HiveJob{HiveJob: desiredProto.GetHiveJob()}
188+
} else if a.desired.Spec.PigJob != nil {
189+
desired.TypeJob = &pb.Job_PigJob{PigJob: desiredProto.GetPigJob()}
190+
} else if a.desired.Spec.SparkRJob != nil {
191+
desired.TypeJob = &pb.Job_SparkRJob{SparkRJob: desiredProto.GetSparkRJob()}
192+
} else if a.desired.Spec.SparkSQLJob != nil {
193+
desired.TypeJob = &pb.Job_SparkSqlJob{SparkSqlJob: desiredProto.GetSparkSqlJob()}
194+
} else if a.desired.Spec.PrestoJob != nil {
195+
desired.TypeJob = &pb.Job_PrestoJob{PrestoJob: desiredProto.GetPrestoJob()}
196+
} else if a.desired.Spec.TrinoJob != nil {
197+
desired.TypeJob = &pb.Job_TrinoJob{TrinoJob: desiredProto.GetTrinoJob()}
198+
} else {
199+
return fmt.Errorf("unknown job type specified")
200+
}
201+
215202
// Set the user-specified ID if provided in the spec.
216-
if a.desired.GetPlacement() != nil && a.desired.GetPlacement().GetClusterUuid() != "" {
203+
if desired.GetPlacement() != nil && desired.GetPlacement().GetClusterUuid() != "" {
217204
// Placement.JobUuid is deprecated. Prefer setting reference.job_id
218205
} else {
219206
// If no specific job ID is in the spec, use the KRM resource name/resourceID
220207
// as the reference ID for submission. This makes jobs findable later if the
221208
// service-generated UUID is lost.
222-
if a.desired.Reference == nil {
223-
a.desired.Reference = &pb.JobReference{}
209+
if desired.Reference == nil {
210+
desired.Reference = &pb.JobReference{}
224211
}
225212
}
226-
a.desired.Reference.JobId = a.id.ID()
227-
if a.desired.Labels == nil {
228-
a.desired.Labels = make(map[string]string)
213+
214+
desired.Reference.JobId = a.id.ID()
215+
if desired.Labels == nil {
216+
desired.Labels = make(map[string]string)
229217
}
230-
a.desired.Labels["managed-by-cnrm"] = "true"
218+
desired.Labels["managed-by-cnrm"] = "true"
231219

232220
// Ensure placement is non-nil if it wasn't set, as it's often needed implicitly
233221
// even if empty (e.g., for cluster selectors). The mapping function should handle this.
234222

235223
req := &pb.SubmitJobRequest{
236224
ProjectId: a.id.Parent().ProjectID,
237225
Region: a.id.Parent().Location,
238-
Job: a.desired,
226+
Job: desired,
239227
}
240228

241229
submittedJob, err := a.gcpClient.SubmitJob(ctx, req)
@@ -247,7 +235,6 @@ func (a *dataprocJobAdapter) Create(ctx context.Context, createOp *directbase.Cr
247235
a.actual = submittedJob // Store the immediate result as 'actual' state
248236

249237
// Map the *returned* job state to the KRM status
250-
mapCtx := &direct.MapContext{}
251238
status := &krm.DataprocJobStatus{}
252239
status.ObservedState = DataprocJobObservedState_FromProto(mapCtx, submittedJob)
253240
if mapCtx.Err() != nil {
@@ -277,8 +264,46 @@ func (a *dataprocJobAdapter) Update(ctx context.Context, updateOp *directbase.Up
277264
return fmt.Errorf("actual state is nil during update for job %q", a.id)
278265
}
279266

267+
mapCtx := &direct.MapContext{}
268+
desiredProto := DataprocJobSpec_ToProto(mapCtx, &a.desired.Spec)
269+
if mapCtx.Err() != nil {
270+
return mapCtx.Err()
271+
}
272+
// We need the full Job message, not just the spec part.
273+
desired := &pb.Job{
274+
Placement: desiredProto.Placement,
275+
JobUuid: desiredProto.JobUuid,
276+
Scheduling: desiredProto.Scheduling,
277+
YarnApplications: desiredProto.YarnApplications,
278+
DriverSchedulingConfig: desiredProto.DriverSchedulingConfig,
279+
Done: desiredProto.Done,
280+
Labels: desiredProto.Labels,
281+
}
282+
// Populate the job type specific field
283+
if a.desired.Spec.HadoopJob != nil {
284+
desired.TypeJob = &pb.Job_HadoopJob{HadoopJob: desiredProto.GetHadoopJob()}
285+
} else if a.desired.Spec.SparkJob != nil {
286+
desired.TypeJob = &pb.Job_SparkJob{SparkJob: desiredProto.GetSparkJob()}
287+
} else if a.desired.Spec.PysparkJob != nil {
288+
desired.TypeJob = &pb.Job_PysparkJob{PysparkJob: desiredProto.GetPysparkJob()}
289+
} else if a.desired.Spec.HiveJob != nil {
290+
desired.TypeJob = &pb.Job_HiveJob{HiveJob: desiredProto.GetHiveJob()}
291+
} else if a.desired.Spec.PigJob != nil {
292+
desired.TypeJob = &pb.Job_PigJob{PigJob: desiredProto.GetPigJob()}
293+
} else if a.desired.Spec.SparkRJob != nil {
294+
desired.TypeJob = &pb.Job_SparkRJob{SparkRJob: desiredProto.GetSparkRJob()}
295+
} else if a.desired.Spec.SparkSQLJob != nil {
296+
desired.TypeJob = &pb.Job_SparkSqlJob{SparkSqlJob: desiredProto.GetSparkSqlJob()}
297+
} else if a.desired.Spec.PrestoJob != nil {
298+
desired.TypeJob = &pb.Job_PrestoJob{PrestoJob: desiredProto.GetPrestoJob()}
299+
} else if a.desired.Spec.TrinoJob != nil {
300+
desired.TypeJob = &pb.Job_TrinoJob{TrinoJob: desiredProto.GetTrinoJob()}
301+
} else {
302+
return fmt.Errorf("unknown job type specified")
303+
}
304+
280305
// Create copies for comparison, normalizing potential output-only fields
281-
desiredComparable := proto.Clone(a.desired).(*pb.Job)
306+
desiredComparable := proto.Clone(desired).(*pb.Job)
282307
actualComparable := proto.Clone(a.actual).(*pb.Job)
283308

284309
// Clear fields that are not part of the spec or are output-only before comparison

pkg/test/resourcefixture/testdata/basic/dataproc/v1alpha1/dataprocjob/dataprocjob-minimal/update.yaml

Lines changed: 0 additions & 28 deletions
This file was deleted.

0 commit comments

Comments
 (0)