Skip to content

Commit bd54328

Browse files
authored
Merge pull request #4321 from ricardomaraschini/implement-watcher-for-oci-bundles
feat: implement watcher for oci bundles
2 parents 006b8be + debf414 commit bd54328

File tree

1 file changed

+142
-35
lines changed

1 file changed

+142
-35
lines changed

pkg/component/worker/ocibundle.go

+142-35
Original file line numberDiff line numberDiff line change
@@ -21,23 +21,31 @@ import (
2121
"fmt"
2222
"os"
2323
"path/filepath"
24+
"sync"
2425
"time"
2526

2627
"github.com/avast/retry-go"
2728
"github.com/containerd/containerd"
2829
"github.com/containerd/containerd/platforms"
30+
"github.com/fsnotify/fsnotify"
31+
"github.com/sirupsen/logrus"
32+
2933
"github.com/k0sproject/k0s/internal/pkg/dir"
3034
"github.com/k0sproject/k0s/pkg/component/manager"
3135
"github.com/k0sproject/k0s/pkg/component/prober"
3236
"github.com/k0sproject/k0s/pkg/config"
3337
"github.com/k0sproject/k0s/pkg/constant"
34-
"github.com/sirupsen/logrus"
38+
"github.com/k0sproject/k0s/pkg/debounce"
3539
)
3640

3741
// OCIBundleReconciler tries to import OCI bundle into the running containerd instance
3842
type OCIBundleReconciler struct {
39-
k0sVars *config.CfgVars
40-
log *logrus.Entry
43+
k0sVars *config.CfgVars
44+
log *logrus.Entry
45+
alreadyImported map[string]time.Time
46+
mtx sync.Mutex
47+
cancel context.CancelFunc
48+
end chan struct{}
4149
*prober.EventEmitter
4250
}
4351

@@ -46,60 +54,155 @@ var _ manager.Component = (*OCIBundleReconciler)(nil)
4654
// NewOCIBundleReconciler builds new reconciler
4755
func NewOCIBundleReconciler(vars *config.CfgVars) *OCIBundleReconciler {
4856
return &OCIBundleReconciler{
49-
k0sVars: vars,
50-
log: logrus.WithField("component", "OCIBundleReconciler"),
51-
EventEmitter: prober.NewEventEmitter(),
57+
k0sVars: vars,
58+
log: logrus.WithField("component", "OCIBundleReconciler"),
59+
EventEmitter: prober.NewEventEmitter(),
60+
alreadyImported: map[string]time.Time{},
61+
end: make(chan struct{}),
5262
}
5363
}
5464

5565
func (a *OCIBundleReconciler) Init(_ context.Context) error {
5666
return dir.Init(a.k0sVars.OCIBundleDir, constant.ManifestsDirMode)
5767
}
5868

59-
func (a *OCIBundleReconciler) Start(ctx context.Context) error {
60-
files, err := os.ReadDir(a.k0sVars.OCIBundleDir)
61-
if err != nil {
62-
a.Emit("can't read bundles directory")
63-
return fmt.Errorf("can't read bundles directory")
64-
}
65-
a.EmitWithPayload("importing OCI bundles", files)
66-
if len(files) == 0 {
67-
return nil
68-
}
69+
// loadOne connects to containerd and imports the provided OCI bundle.
70+
func (a *OCIBundleReconciler) loadOne(ctx context.Context, fpath string) error {
6971
var client *containerd.Client
7072
sock := filepath.Join(a.k0sVars.RunDir, "containerd.sock")
71-
err = retry.Do(func() error {
72-
client, err = containerd.New(sock, containerd.WithDefaultNamespace("k8s.io"), containerd.WithDefaultPlatform(platforms.OnlyStrict(platforms.DefaultSpec())))
73+
if err := retry.Do(func() (err error) {
74+
client, err = containerd.New(
75+
sock,
76+
containerd.WithDefaultNamespace("k8s.io"),
77+
containerd.WithDefaultPlatform(
78+
platforms.OnlyStrict(platforms.DefaultSpec()),
79+
),
80+
)
7381
if err != nil {
74-
a.log.WithError(err).Errorf("can't connect to containerd socket %s", sock)
75-
return err
82+
return fmt.Errorf("failed to connect to containerd: %w", err)
7683
}
77-
_, err := client.ListImages(ctx)
78-
if err != nil {
79-
a.log.WithError(err).Errorf("can't use containerd client")
80-
return err
84+
if _, err = client.ListImages(ctx); err != nil {
85+
return fmt.Errorf("failed to communicate with containerd: %w", err)
8186
}
8287
return nil
83-
}, retry.Context(ctx), retry.Delay(time.Second*5))
84-
if err != nil {
85-
a.EmitWithPayload("can't connect to containerd socket", map[string]interface{}{"socket": sock, "error": err})
86-
return fmt.Errorf("can't connect to containerd socket %s: %w", sock, err)
88+
}, retry.Context(ctx), retry.Delay(time.Second*5)); err != nil {
89+
return err
8790
}
8891
defer client.Close()
92+
if err := a.unpackBundle(ctx, client, fpath); err != nil {
93+
return fmt.Errorf("failed to process OCI bundle: %w", err)
94+
}
95+
return nil
96+
}
97+
98+
// loadAll loads all OCI bundle files into containerd. Read all files from the OCI bundle
99+
// directory and loads them one by one. Errors are logged but not returned, upon failure
100+
// in one file this function logs the error and moves to the next file. Files are indexed
101+
// by name and imported only once (if the file has not been modified).
102+
func (a *OCIBundleReconciler) loadAll(ctx context.Context) {
103+
// We are going to consume everything in the directory so we block. This keeps
104+
// things simple and avoid the need to handle two imports of the same file at the
105+
// same time without requiring locks based on file path.
106+
a.mtx.Lock()
107+
defer a.mtx.Unlock()
89108

109+
a.log.Info("Loading OCI bundles directory")
110+
files, err := os.ReadDir(a.k0sVars.OCIBundleDir)
111+
if err != nil {
112+
a.log.WithError(err).Errorf("Failed to read bundles directory")
113+
return
114+
}
115+
a.EmitWithPayload("importing OCI bundles", files)
90116
for _, file := range files {
91-
if err := a.unpackBundle(ctx, client, a.k0sVars.OCIBundleDir+"/"+file.Name()); err != nil {
92-
a.EmitWithPayload("unpacking OCI bundle error", map[string]interface{}{"file": file.Name(), "error": err})
93-
a.log.WithError(err).Errorf("can't unpack bundle %s", file.Name())
94-
return fmt.Errorf("can't unpack bundle %s: %w", file.Name(), err)
117+
fpath := filepath.Join(a.k0sVars.OCIBundleDir, file.Name())
118+
finfo, err := os.Stat(fpath)
119+
if err != nil {
120+
a.log.WithError(err).Errorf("failed to stat %s", fpath)
121+
continue
122+
}
123+
124+
modtime := finfo.ModTime()
125+
if when, ok := a.alreadyImported[fpath]; ok && when.Equal(modtime) {
126+
continue
127+
}
128+
129+
a.log.Infof("Loading OCI bundle %s", fpath)
130+
if err := a.loadOne(ctx, fpath); err != nil {
131+
a.log.WithError(err).Errorf("Failed to load OCI bundle %s", fpath)
132+
continue
133+
}
134+
135+
a.alreadyImported[fpath] = modtime
136+
a.log.Infof("OCI bundle %s loaded", fpath)
137+
}
138+
a.Emit("finished importing OCI bundles")
139+
}
140+
141+
// installWatcher creates a fs watcher on the oci bundle directory. This function calls
142+
// loadAll every time a new file is created or updated on the oci directory. Events are
143+
// debounced with a timeout of 10 seconds. Watcher is started with a buffer so we don't
144+
// miss events.
145+
func (a *OCIBundleReconciler) installWatcher(ctx context.Context) error {
146+
watcher, err := fsnotify.NewBufferedWatcher(10)
147+
if err != nil {
148+
return fmt.Errorf("failed to create watcher: %w", err)
149+
}
150+
151+
if err := watcher.Add(a.k0sVars.OCIBundleDir); err != nil {
152+
return fmt.Errorf("failed to add watcher: %w", err)
153+
}
154+
155+
debouncer := debounce.Debouncer[fsnotify.Event]{
156+
Input: watcher.Events,
157+
Timeout: 10 * time.Second,
158+
Filter: func(item fsnotify.Event) bool {
159+
switch item.Op {
160+
case fsnotify.Remove, fsnotify.Rename:
161+
return false
162+
}
163+
return true
164+
},
165+
Callback: func(ev fsnotify.Event) {
166+
a.loadAll(ctx)
167+
},
168+
}
169+
170+
go func() {
171+
for {
172+
if err, ok := <-watcher.Errors; ok {
173+
a.log.WithError(err).Error("Error watching OCI bundle directory")
174+
continue
175+
}
176+
return
177+
}
178+
}()
179+
180+
go func() {
181+
defer close(a.end)
182+
a.log.Infof("Started to watch events on %s", a.k0sVars.OCIBundleDir)
183+
_ = debouncer.Run(ctx)
184+
if err := watcher.Close(); err != nil {
185+
a.log.Errorf("Failed to close watcher: %s", err)
95186
}
96-
a.EmitWithPayload("unpacked OCI bundle", file.Name())
187+
a.log.Info("OCI bundle watch bouncer ended")
188+
}()
189+
190+
return nil
191+
}
192+
193+
// Starts initiate the OCI bundle loader. It does an initial load of the directory and
194+
// once it is done, it starts a watcher on its own goroutine.
195+
func (a *OCIBundleReconciler) Start(ctx context.Context) error {
196+
ictx, cancel := context.WithCancel(context.Background())
197+
a.cancel = cancel
198+
if err := a.installWatcher(ictx); err != nil {
199+
return fmt.Errorf("failed to install watcher: %w", err)
97200
}
98-
a.Emit("finished importing OCI bundle")
201+
a.loadAll(ictx)
99202
return nil
100203
}
101204

102-
func (a OCIBundleReconciler) unpackBundle(ctx context.Context, client *containerd.Client, bundlePath string) error {
205+
func (a *OCIBundleReconciler) unpackBundle(ctx context.Context, client *containerd.Client, bundlePath string) error {
103206
r, err := os.Open(bundlePath)
104207
if err != nil {
105208
return fmt.Errorf("can't open bundle file %s: %w", bundlePath, err)
@@ -127,5 +230,9 @@ func (a OCIBundleReconciler) unpackBundle(ctx context.Context, client *container
127230
}
128231

129232
func (a *OCIBundleReconciler) Stop() error {
233+
a.log.Info("Stopping OCI bundle loader watcher")
234+
a.cancel()
235+
<-a.end
236+
a.log.Info("OCI bundle loader stopped")
130237
return nil
131238
}

0 commit comments

Comments
 (0)