Skip to content

Commit 2e33491

Browse files
committed
Merge remote-tracking branch 'upstream/main'
merged with forked repo
2 parents 583d82e + 5fb693f commit 2e33491

File tree

2,128 files changed

+106016
-54460
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

2,128 files changed

+106016
-54460
lines changed

.github/workflows/kind-e2e.yaml

+8-8
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ jobs:
1717
fail-fast: false # Keep running if one leg fails.
1818
matrix:
1919
k8s-version:
20-
- v1.28.7
21-
- v1.29.0
20+
- v1.30.8
21+
- v1.31.4
2222

2323
test-suite:
2424
- ./test/e2e
@@ -29,12 +29,12 @@ jobs:
2929
# This is attempting to make it a bit clearer what's being tested.
3030
# See: https://github.com/kubernetes-sigs/kind/releases/tag/v0.20.0
3131
include:
32-
- k8s-version: v1.28.7
33-
kind-version: v0.21.0
34-
kind-image-sha: sha256:9bc6c451a289cf96ad0bbaf33d416901de6fd632415b076ab05f5fa7e4f65c58
35-
- k8s-version: v1.29.0
36-
kind-version: v0.21.0
37-
kind-image-sha: sha256:eaa1450915475849a73a9227b8f201df25e55e268e5d619312131292e324d570
32+
- k8s-version: v1.30.8
33+
kind-version: v0.26.0
34+
kind-image-sha: sha256:17cd608b3971338d9180b00776cb766c50d0a0b6b904ab4ff52fd3fc5c6369bf
35+
- k8s-version: v1.31.4
36+
kind-version: v0.26.0
37+
kind-image-sha: sha256:2cb39f7295fe7eafee0842b1052a599a4fb0f8bcf3f83d96c7f4864c357c6c30
3838

3939
# Add the flags we use for each of these test suites.
4040
- test-suite: ./test/e2e

OWNERS_ALIASES

+5-20
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,9 @@ aliases:
1313
- vyasgun
1414
docs-reviewers:
1515
- nainaz
16-
- retocode
1716
- skonto
1817
docs-writers:
1918
- csantanapr
20-
- retocode
2119
- skonto
2220
eventing-reviewers:
2321
- Leo6Leo
@@ -51,7 +49,6 @@ aliases:
5149
knative-admin:
5250
- aliok
5351
- cardil
54-
- davidhadas
5552
- dprotaso
5653
- dsimansk
5754
- evankanderson
@@ -60,15 +57,13 @@ aliases:
6057
- knative-prow-robot
6158
- knative-prow-updater-robot
6259
- knative-test-reporter-robot
63-
- nainaz
64-
- psschwei
65-
- retocode
66-
- salaboy
60+
- matzew
61+
- nrrso
6762
- skonto
6863
- upodroid
6964
knative-release-leads:
65+
- dprotaso
7066
- dsimansk
71-
- retocode
7267
- skonto
7368
knative-robots:
7469
- knative-automation
@@ -105,32 +100,22 @@ aliases:
105100
- davidhadas
106101
- evankanderson
107102
serving-approvers:
108-
- ReToCode
109103
- skonto
110104
serving-reviewers:
111-
- izabelacg
112-
- retocode
113105
- skonto
114106
serving-triage:
115-
- izabelacg
116-
- retocode
117107
- skonto
118108
serving-wg-leads:
119109
- dprotaso
120110
serving-writers:
121-
- ReToCode
122111
- dprotaso
123112
- skonto
124113
steering-committee:
125114
- aliok
126-
- davidhadas
127115
- dprotaso
128-
- dsimansk
129116
- evankanderson
130-
- nainaz
131-
- psschwei
132-
- salaboy
133-
technical-oversight-committee: []
117+
- matzew
118+
- nrrso
134119
ux-wg-leads:
135120
- cali0707
136121
- leo6leo

cmd/controller/main.go

+3-39
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,6 @@ import (
2020
// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
2121
// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
2222

23-
"errors"
24-
"log"
25-
"net/http"
26-
"os"
27-
"time"
28-
2923
"knative.dev/pkg/injection/sharedmain"
3024

3125
filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered"
@@ -41,7 +35,8 @@ import (
4135
"knative.dev/eventing/pkg/reconciler/channel"
4236
"knative.dev/eventing/pkg/reconciler/containersource"
4337
"knative.dev/eventing/pkg/reconciler/eventtype"
44-
"knative.dev/eventing/pkg/reconciler/integrationsource"
38+
integrationsink "knative.dev/eventing/pkg/reconciler/integration/sink"
39+
integrationsource "knative.dev/eventing/pkg/reconciler/integration/source"
4540
"knative.dev/eventing/pkg/reconciler/parallel"
4641
"knative.dev/eventing/pkg/reconciler/pingsource"
4742
"knative.dev/eventing/pkg/reconciler/sequence"
@@ -52,36 +47,8 @@ import (
5247
)
5348

5449
func main() {
55-
5650
ctx := signals.NewContext()
5751

58-
port := os.Getenv("PROBES_PORT")
59-
if port == "" {
60-
port = "8080"
61-
}
62-
63-
// sets up liveness and readiness probes.
64-
server := http.Server{
65-
ReadTimeout: 5 * time.Second,
66-
Handler: http.HandlerFunc(handler),
67-
Addr: ":" + port,
68-
}
69-
70-
go func() {
71-
72-
go func() {
73-
<-ctx.Done()
74-
_ = server.Shutdown(ctx)
75-
}()
76-
77-
// start the web server on port and accept requests
78-
log.Printf("Readiness and health check server listening on port %s", port)
79-
80-
if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
81-
log.Fatal(err)
82-
}
83-
}()
84-
8552
ctx = filteredFactory.WithSelectors(ctx,
8653
auth.OIDCLabelSelector,
8754
eventingtls.TrustBundleLabelSelector,
@@ -112,13 +79,10 @@ func main() {
11279

11380
// Sinks
11481
jobsink.NewController,
82+
integrationsink.NewController,
11583

11684
// Sugar
11785
sugarnamespace.NewController,
11886
sugartrigger.NewController,
11987
)
12088
}
121-
122-
func handler(w http.ResponseWriter, r *http.Request) {
123-
w.WriteHeader(http.StatusOK)
124-
}

cmd/jobsink/main.go

+74-53
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"crypto/md5" //nolint:gosec
2222
"crypto/tls"
23+
"encoding/hex"
2324
"fmt"
2425
"log"
2526
"net/http"
@@ -65,7 +66,7 @@ import (
6566
"knative.dev/eventing/pkg/utils"
6667
)
6768

68-
const component = "job-sink"
69+
const component = "job_sink"
6970

7071
func main() {
7172

@@ -231,11 +232,11 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
231232
return
232233
}
233234

234-
id := toIdHashLabelValue(event.Source(), event.ID())
235-
logger.Debug("Getting job for event", zap.String("URI", r.RequestURI), zap.String("id", id))
235+
jobName := toJobName(ref.Name, event.Source(), event.ID())
236+
logger.Debug("Getting job for event", zap.String("URI", r.RequestURI), zap.String("jobName", jobName))
236237

237238
jobs, err := h.k8s.BatchV1().Jobs(js.GetNamespace()).List(r.Context(), metav1.ListOptions{
238-
LabelSelector: jobLabelSelector(ref, id),
239+
LabelSelector: jobLabelSelector(ref, jobName),
239240
Limit: 1,
240241
})
241242
if err != nil {
@@ -256,56 +257,24 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
256257
return
257258
}
258259

259-
jobName := kmeta.ChildName(ref.Name, id)
260-
261-
logger.Debug("Creating secret for event", zap.String("URI", r.RequestURI), zap.String("jobName", jobName))
262-
263-
jobSinkUID := js.GetUID()
264-
265-
or := metav1.OwnerReference{
266-
APIVersion: sinksv.SchemeGroupVersion.String(),
267-
Kind: sinks.JobSinkResource.Resource,
268-
Name: js.GetName(),
269-
UID: jobSinkUID,
270-
Controller: ptr.Bool(true),
271-
BlockOwnerDeletion: ptr.Bool(false),
272-
}
273-
274-
secret := &corev1.Secret{
275-
TypeMeta: metav1.TypeMeta{},
276-
ObjectMeta: metav1.ObjectMeta{
277-
Name: jobName,
278-
Namespace: ref.Namespace,
279-
Labels: map[string]string{
280-
sinks.JobSinkIDLabel: id,
281-
sinks.JobSinkNameLabel: ref.Name,
282-
},
283-
OwnerReferences: []metav1.OwnerReference{or},
284-
},
285-
Immutable: ptr.Bool(true),
286-
Data: map[string][]byte{"event": eventBytes},
287-
Type: corev1.SecretTypeOpaque,
288-
}
289-
290-
_, err = h.k8s.CoreV1().Secrets(ref.Namespace).Create(r.Context(), secret, metav1.CreateOptions{})
291-
if err != nil && !apierrors.IsAlreadyExists(err) {
292-
logger.Warn("Failed to create secret", zap.Error(err))
293-
294-
w.Header().Add("Reason", err.Error())
295-
w.WriteHeader(http.StatusInternalServerError)
296-
return
297-
}
298-
299-
logger.Debug("Creating job for event", zap.String("URI", r.RequestURI), zap.String("jobName", jobName))
260+
js = js.DeepCopy() // Do not modify informer copy.
261+
js.SetDefaults(ctx)
300262

301263
job := js.Spec.Job.DeepCopy()
302264
job.Name = jobName
303265
if job.Labels == nil {
304266
job.Labels = make(map[string]string, 4)
305267
}
306-
job.Labels[sinks.JobSinkIDLabel] = id
268+
job.Labels[sinks.JobSinkIDLabel] = jobName
307269
job.Labels[sinks.JobSinkNameLabel] = ref.Name
308-
job.OwnerReferences = append(job.OwnerReferences, or)
270+
job.OwnerReferences = append(job.OwnerReferences, metav1.OwnerReference{
271+
APIVersion: sinksv.SchemeGroupVersion.String(),
272+
Kind: sinks.JobSinkResource.Resource,
273+
Name: js.GetName(),
274+
UID: js.GetUID(),
275+
Controller: ptr.Bool(true),
276+
BlockOwnerDeletion: ptr.Bool(false),
277+
})
309278
var mountPathName string
310279
for i := range job.Spec.Template.Spec.Containers {
311280
found := false
@@ -346,14 +315,66 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
346315
})
347316
}
348317

349-
_, err = h.k8s.BatchV1().Jobs(ref.Namespace).Create(r.Context(), job, metav1.CreateOptions{})
350-
if err != nil {
318+
logger.Debug("Creating job for event",
319+
zap.String("URI", r.RequestURI),
320+
zap.String("jobName", jobName),
321+
zap.Any("job", job),
322+
)
323+
324+
createdJob, err := h.k8s.BatchV1().Jobs(ref.Namespace).Create(r.Context(), job, metav1.CreateOptions{})
325+
if err != nil && !apierrors.IsAlreadyExists(err) {
351326
logger.Warn("Failed to create job", zap.Error(err))
352327

353328
w.Header().Add("Reason", err.Error())
354329
w.WriteHeader(http.StatusInternalServerError)
355330
return
356331
}
332+
if apierrors.IsAlreadyExists(err) {
333+
logger.Debug("Job already exists", zap.String("URI", r.RequestURI), zap.String("jobName", jobName))
334+
}
335+
336+
secret := &corev1.Secret{
337+
TypeMeta: metav1.TypeMeta{},
338+
ObjectMeta: metav1.ObjectMeta{
339+
Name: jobName,
340+
Namespace: ref.Namespace,
341+
Labels: map[string]string{
342+
sinks.JobSinkIDLabel: jobName,
343+
sinks.JobSinkNameLabel: ref.Name,
344+
},
345+
OwnerReferences: []metav1.OwnerReference{
346+
{
347+
APIVersion: "batch/v1",
348+
Kind: "Job",
349+
Name: createdJob.Name,
350+
UID: createdJob.UID,
351+
Controller: ptr.Bool(true),
352+
BlockOwnerDeletion: ptr.Bool(false),
353+
},
354+
},
355+
},
356+
Immutable: ptr.Bool(true),
357+
Data: map[string][]byte{"event": eventBytes},
358+
Type: corev1.SecretTypeOpaque,
359+
}
360+
361+
logger.Debug("Creating secret for event",
362+
zap.String("URI", r.RequestURI),
363+
zap.String("jobName", jobName),
364+
zap.Any("secret.metadata", secret.ObjectMeta),
365+
)
366+
367+
_, err = h.k8s.CoreV1().Secrets(ref.Namespace).Create(r.Context(), secret, metav1.CreateOptions{})
368+
if err != nil && !apierrors.IsAlreadyExists(err) {
369+
logger.Warn("Failed to create secret", zap.Error(err))
370+
371+
w.Header().Add("Reason", err.Error())
372+
w.WriteHeader(http.StatusInternalServerError)
373+
return
374+
}
375+
if apierrors.IsAlreadyExists(err) {
376+
logger.Debug("Secret already exists", zap.String("URI", r.RequestURI), zap.String("jobName", jobName))
377+
}
357378

358379
w.Header().Add("Location", locationHeader(ref, event.Source(), event.ID()))
359380
w.WriteHeader(http.StatusAccepted)
@@ -391,8 +412,7 @@ func (h *Handler) handleGet(ctx context.Context, w http.ResponseWriter, r *http.
391412
eventSource := parts[6]
392413
eventID := parts[8]
393414

394-
id := toIdHashLabelValue(eventSource, eventID)
395-
jobName := kmeta.ChildName(ref.Name, id)
415+
jobName := toJobName(ref.Name, eventSource, eventID)
396416

397417
job, err := h.k8s.BatchV1().Jobs(ref.Namespace).Get(r.Context(), jobName, metav1.GetOptions{})
398418
if err != nil {
@@ -445,6 +465,7 @@ func jobLabelSelector(ref types.NamespacedName, id string) string {
445465
return fmt.Sprintf("%s=%s,%s=%s", sinks.JobSinkIDLabel, id, sinks.JobSinkNameLabel, ref.Name)
446466
}
447467

448-
func toIdHashLabelValue(source, id string) string {
449-
return utils.ToDNS1123Subdomain(fmt.Sprintf("%s", md5.Sum([]byte(fmt.Sprintf("%s-%s", source, id))))) //nolint:gosec
468+
func toJobName(js string, source, id string) string {
469+
h := md5.Sum([]byte(source + id)) //nolint:gosec
470+
return kmeta.ChildName(js+"-", utils.ToDNS1123Subdomain(hex.EncodeToString(h[:])))
450471
}

0 commit comments

Comments
 (0)