Skip to content

Commit 985bb74

Browse files
feat: pre-pull images and sideload them into the cluster (#143)
1 parent 91789e6 commit 985bb74

File tree

10 files changed

+302
-52
lines changed

10 files changed

+302
-52
lines changed

internal/cmd/images/manifest_cmd.go

Lines changed: 21 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,11 @@ import (
77
"strings"
88

99
"github.com/airbytehq/abctl/internal/cmd/local/helm"
10-
"github.com/airbytehq/abctl/internal/cmd/local/k8s"
10+
"github.com/airbytehq/abctl/internal/common"
1111
"github.com/airbytehq/abctl/internal/trace"
1212
helmlib "github.com/mittwald/go-helm-client"
1313
"helm.sh/helm/v3/pkg/repo"
1414

15-
"github.com/airbytehq/abctl/internal/common"
1615
appsv1 "k8s.io/api/apps/v1"
1716
batchv1 "k8s.io/api/batch/v1"
1817
corev1 "k8s.io/api/core/v1"
@@ -27,16 +26,11 @@ type ManifestCmd struct {
2726
Values string `type:"existingfile" help:"An Airbyte helm chart values file to configure helm."`
2827
}
2928

30-
func (c *ManifestCmd) Run(ctx context.Context, provider k8s.Provider) error {
29+
func (c *ManifestCmd) Run(ctx context.Context) error {
3130
ctx, span := trace.NewSpan(ctx, "images manifest")
3231
defer span.End()
3332

34-
client, err := helm.New(provider.Kubeconfig, provider.Context, common.AirbyteNamespace)
35-
if err != nil {
36-
return err
37-
}
38-
39-
images, err := c.findAirbyteImages(ctx, client)
33+
images, err := c.findAirbyteImages(ctx)
4034
if err != nil {
4135
return err
4236
}
@@ -48,7 +42,7 @@ func (c *ManifestCmd) Run(ctx context.Context, provider k8s.Provider) error {
4842
return nil
4943
}
5044

51-
func (c *ManifestCmd) findAirbyteImages(ctx context.Context, client helm.Client) ([]string, error) {
45+
func (c *ManifestCmd) findAirbyteImages(ctx context.Context) ([]string, error) {
5246
valuesYaml, err := helm.BuildAirbyteValues(ctx, helm.ValuesOpts{
5347
ValuesFile: c.Values,
5448
})
@@ -57,11 +51,20 @@ func (c *ManifestCmd) findAirbyteImages(ctx context.Context, client helm.Client)
5751
}
5852

5953
airbyteChartLoc := helm.LocateLatestAirbyteChart(c.ChartVersion, c.Chart)
60-
return findImagesFromChart(client, valuesYaml, airbyteChartLoc, c.ChartVersion)
54+
return FindImagesFromChart(valuesYaml, airbyteChartLoc, c.ChartVersion)
6155
}
6256

63-
func findImagesFromChart(client helm.Client, valuesYaml, chartName, chartVersion string) ([]string, error) {
64-
err := client.AddOrUpdateChartRepo(repo.Entry{
57+
func FindImagesFromChart(valuesYaml, chartName, chartVersion string) ([]string, error) {
58+
59+
// sharing a helm client with the install code causes some weird issues,
60+
// and templating the chart doesn't need details about the k8s provider,
61+
// we create a throwaway helm client here.
62+
client, err := helmlib.New(helm.ClientOptions(common.AirbyteNamespace))
63+
if err != nil {
64+
return nil, err
65+
}
66+
67+
err = client.AddOrUpdateChartRepo(repo.Entry{
6568
Name: common.AirbyteRepoName,
6669
URL: common.AirbyteRepoURL,
6770
})
@@ -88,7 +91,7 @@ func findImagesFromChart(client helm.Client, valuesYaml, chartName, chartVersion
8891
// It returns a unique, sorted list of images found.
8992
func findAllImages(chartYaml string) []string {
9093
objs := decodeK8sResources(chartYaml)
91-
imageSet := set[string]{}
94+
imageSet := common.Set[string]{}
9295

9396
for _, obj := range objs {
9497

@@ -98,7 +101,7 @@ func findAllImages(chartYaml string) []string {
98101
if strings.HasSuffix(z.Name, "airbyte-env") {
99102
for k, v := range z.Data {
100103
if strings.HasSuffix(k, "_IMAGE") {
101-
imageSet.add(v)
104+
imageSet.Add(v)
102105
}
103106
}
104107
}
@@ -116,15 +119,15 @@ func findAllImages(chartYaml string) []string {
116119
}
117120

118121
for _, c := range podSpec.InitContainers {
119-
imageSet.add(c.Image)
122+
imageSet.Add(c.Image)
120123
}
121124
for _, c := range podSpec.Containers {
122-
imageSet.add(c.Image)
125+
imageSet.Add(c.Image)
123126
}
124127
}
125128

126129
var out []string
127-
for _, k := range imageSet.items() {
130+
for _, k := range imageSet.Items() {
128131
if k != "" {
129132
out = append(out, k)
130133
}
@@ -149,22 +152,3 @@ func decodeK8sResources(renderedYaml string) []runtime.Object {
149152
}
150153
return out
151154
}
152-
153-
type set[T comparable] struct {
154-
vals map[T]struct{}
155-
}
156-
157-
func (s *set[T]) add(v T) {
158-
if s.vals == nil {
159-
s.vals = map[T]struct{}{}
160-
}
161-
s.vals[v] = struct{}{}
162-
}
163-
164-
func (s *set[T]) items() []T {
165-
out := make([]T, len(s.vals))
166-
for k := range s.vals {
167-
out = append(out, k)
168-
}
169-
return out
170-
}

internal/cmd/images/manifest_cmd_test.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,10 @@ func getHelmTestClient(t *testing.T) helm.Client {
2020
}
2121

2222
func TestManifestCmd(t *testing.T) {
23-
client := getHelmTestClient(t)
2423
cmd := ManifestCmd{
2524
ChartVersion: "1.1.0",
2625
}
27-
actual, err := cmd.findAirbyteImages(context.Background(), client)
26+
actual, err := cmd.findAirbyteImages(context.Background())
2827
if err != nil {
2928
t.Fatal(err)
3029
}
@@ -48,12 +47,11 @@ func TestManifestCmd(t *testing.T) {
4847
}
4948

5049
func TestManifestCmd_Enterprise(t *testing.T) {
51-
client := getHelmTestClient(t)
5250
cmd := ManifestCmd{
5351
ChartVersion: "1.1.0",
5452
Values: "testdata/enterprise.values.yaml",
5553
}
56-
actual, err := cmd.findAirbyteImages(context.Background(), client)
54+
actual, err := cmd.findAirbyteImages(context.Background())
5755
if err != nil {
5856
t.Fatal(err)
5957
}
@@ -81,13 +79,12 @@ func TestManifestCmd_Enterprise(t *testing.T) {
8179
}
8280

8381
func TestManifestCmd_Nightly(t *testing.T) {
84-
client := getHelmTestClient(t)
8582
cmd := ManifestCmd{
8683
// This version includes chart fixes that expose images more consistently and completely.
8784
ChartVersion: "1.1.0-nightly-1728428783-9025e1a46e",
8885
Values: "testdata/enterprise.values.yaml",
8986
}
90-
actual, err := cmd.findAirbyteImages(context.Background(), client)
87+
actual, err := cmd.findAirbyteImages(context.Background())
9188
if err != nil {
9289
t.Fatal(err)
9390
}

internal/cmd/local/docker/docker.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type Client interface {
4343

4444
ImageList(ctx context.Context, options image.ListOptions) ([]image.Summary, error)
4545
ImagePull(ctx context.Context, refStr string, options image.PullOptions) (io.ReadCloser, error)
46+
ImageSave(ctx context.Context, imageIDs []string) (io.ReadCloser, error)
4647

4748
ServerVersion(ctx context.Context) (types.Version, error)
4849
VolumeInspect(ctx context.Context, volumeID string) (volume.Volume, error)

internal/cmd/local/docker/dockertest/dockertest.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type MockClient struct {
2626
FnContainerExecStart func(ctx context.Context, execID string, config container.ExecStartOptions) error
2727
FnImageList func(ctx context.Context, options image.ListOptions) ([]image.Summary, error)
2828
FnImagePull func(ctx context.Context, refStr string, options image.PullOptions) (io.ReadCloser, error)
29+
FnImageSave func(ctx context.Context, imageIDs []string) (io.ReadCloser, error)
2930
FnServerVersion func(ctx context.Context) (types.Version, error)
3031
FnVolumeInspect func(ctx context.Context, volumeID string) (volume.Volume, error)
3132
FnInfo func(ctx context.Context) (system.Info, error)
@@ -82,6 +83,10 @@ func (m MockClient) ImagePull(ctx context.Context, refStr string, options image.
8283
return m.FnImagePull(ctx, refStr, options)
8384
}
8485

86+
func (m MockClient) ImageSave(ctx context.Context, imageIDs []string) (io.ReadCloser, error) {
87+
return m.ImageSave(ctx, imageIDs)
88+
}
89+
8590
func (m MockClient) ServerVersion(ctx context.Context) (types.Version, error) {
8691
return m.FnServerVersion(ctx)
8792
}

internal/cmd/local/helm/helm.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,18 @@ type Client interface {
2626
TemplateChart(spec *helmclient.ChartSpec, options *helmclient.HelmTemplateOptions) ([]byte, error)
2727
}
2828

29+
func ClientOptions(namespace string) *helmclient.Options {
30+
logger := helmLogger{}
31+
return &helmclient.Options{
32+
Namespace: namespace,
33+
Output: logger,
34+
DebugLog: logger.Debug,
35+
Debug: true,
36+
RepositoryCache: paths.HelmRepoCache,
37+
RepositoryConfig: paths.HelmRepoConfig,
38+
}
39+
}
40+
2941
// New returns the default helm client
3042
func New(kubecfg, kubectx, namespace string) (Client, error) {
3143
k8sCfg := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
@@ -38,16 +50,8 @@ func New(kubecfg, kubectx, namespace string) (Client, error) {
3850
return nil, fmt.Errorf("%w: unable to create rest config: %w", localerr.ErrKubernetes, err)
3951
}
4052

41-
logger := helmLogger{}
4253
helm, err := helmclient.NewClientFromRestConf(&helmclient.RestConfClientOptions{
43-
Options: &helmclient.Options{
44-
Namespace: namespace,
45-
Output: logger,
46-
DebugLog: logger.Debug,
47-
Debug: true,
48-
RepositoryCache: paths.HelmRepoCache,
49-
RepositoryConfig: paths.HelmRepoConfig,
50-
},
54+
Options: ClientOptions(namespace),
5155
RestConfig: restCfg,
5256
})
5357
if err != nil {

internal/cmd/local/k8s/cluster.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"os"
88
"time"
99

10+
"github.com/airbytehq/abctl/internal/cmd/local/docker"
1011
"github.com/airbytehq/abctl/internal/cmd/local/k8s/kind"
1112
"github.com/airbytehq/abctl/internal/cmd/local/paths"
1213
"github.com/airbytehq/abctl/internal/trace"
@@ -30,6 +31,7 @@ type Cluster interface {
3031
Delete(ctx context.Context) error
3132
// Exists returns true if the cluster exists, false otherwise.
3233
Exists(ctx context.Context) bool
34+
LoadImages(ctx context.Context, dockerClient docker.Client, images []string)
3335
}
3436

3537
// interface sanity check
@@ -110,6 +112,23 @@ func (k *kindCluster) Exists(ctx context.Context) bool {
110112
return false
111113
}
112114

115+
// LoadImages pulls images from Docker Hub, and loads them into the kind cluster.
116+
// This is a best-effort optimization, which is why it doesn't return an error.
117+
// It's possible that only some images will be loaded.
118+
func (k *kindCluster) LoadImages(ctx context.Context, dockerClient docker.Client, images []string) {
119+
// Get a list of Kind nodes.
120+
nodes, err := k.p.ListNodes(k.clusterName)
121+
if err != nil {
122+
pterm.Debug.Printfln("failed to load images: %s", err)
123+
return
124+
}
125+
126+
err = loadImages(ctx, dockerClient, nodes, images)
127+
if err != nil {
128+
pterm.Debug.Printfln("failed to load images: %s", err)
129+
}
130+
}
131+
113132
func formatKindErr(err error) error {
114133
var kindErr *kindExec.RunError
115134
if errors.As(err, &kindErr) {

0 commit comments

Comments
 (0)