Skip to content

Commit c2c129b

Browse files
authored
feat: support Airbyte DB with Postgres 17 (#162)
1 parent c942480 commit c2c129b

File tree

12 files changed

+340
-447
lines changed

12 files changed

+340
-447
lines changed

internal/cmd/local/helm/airbyte_values.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,13 @@ type ValuesOpts struct {
1717
ImagePullSecret string
1818
DisableAuth bool
1919
LocalStorage bool
20+
EnablePsql17 bool
2021
}
2122

23+
const (
24+
Psql17AirbyteTag = "1.7.0-17"
25+
)
26+
2227
func BuildAirbyteValues(ctx context.Context, opts ValuesOpts) (string, error) {
2328
span := trace.SpanFromContext(ctx)
2429

@@ -33,6 +38,10 @@ func BuildAirbyteValues(ctx context.Context, opts ValuesOpts) (string, error) {
3338
vals = append(vals, "global.storage.type=local")
3439
}
3540

41+
if opts.EnablePsql17 {
42+
vals = append(vals, "postgresql.image.tag="+Psql17AirbyteTag)
43+
}
44+
3645
span.SetAttributes(
3746
attribute.Bool("low-resource-mode", opts.LowResourceMode),
3847
attribute.Bool("insecure-cookies", opts.InsecureCookies),

internal/cmd/local/local/cmd.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package local
22

33
import (
4+
"errors"
45
"fmt"
6+
"io/fs"
57
"net/http"
68
"os"
9+
"path"
710
"path/filepath"
811
"time"
912

@@ -12,6 +15,7 @@ import (
1215
"github.com/airbytehq/abctl/internal/cmd/local/k8s/kind"
1316
"github.com/airbytehq/abctl/internal/cmd/local/paths"
1417
"github.com/airbytehq/abctl/internal/common"
18+
"github.com/airbytehq/abctl/internal/pgdata"
1519
"k8s.io/client-go/rest"
1620

1721
"github.com/airbytehq/abctl/internal/cmd/local/k8s"
@@ -199,7 +203,7 @@ func DefaultK8s(kubecfg, kubectx string) (k8s.Client, error) {
199203
// local filesystem. It returns true if the MinIO data directory exists.
200204
// Otherwise it returns false.
201205
func SupportMinio() (bool, error) {
202-
minioPath := filepath.Join(paths.Data, pvMinio)
206+
minioPath := filepath.Join(paths.Data, paths.PvMinio)
203207
f, err := os.Stat(minioPath)
204208
if err != nil {
205209
if os.IsNotExist(err) {
@@ -211,3 +215,23 @@ func SupportMinio() (bool, error) {
211215

212216
return f.IsDir(), nil
213217
}
218+
219+
// EnablePsql17 checks if PostgreSQL data needs patching by examining the
220+
// local PostgreSQL data directory. It returns true if the directory doesn't
221+
// exist or contains PostgreSQL version 17. Otherwise it returns false.
222+
func EnablePsql17() (bool, error) {
223+
pgData := pgdata.New(&pgdata.Config{
224+
Path: path.Join(paths.Data, paths.PvPsql, "pgdata"),
225+
})
226+
227+
pgVersion, err := pgData.Version()
228+
if err != nil && !errors.Is(err, fs.ErrNotExist) {
229+
return false, fmt.Errorf("failed to determine if any previous psql version exists: %w", err)
230+
}
231+
232+
if pgVersion == "" || pgVersion == "17" {
233+
return true, nil
234+
}
235+
236+
return false, nil
237+
}

internal/cmd/local/local/install.go

Lines changed: 23 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,9 @@ import (
1616
"github.com/airbytehq/abctl/internal/cmd/local/helm"
1717
"github.com/airbytehq/abctl/internal/cmd/local/k8s"
1818
"github.com/airbytehq/abctl/internal/cmd/local/localerr"
19-
"github.com/airbytehq/abctl/internal/cmd/local/migrate"
2019
"github.com/airbytehq/abctl/internal/cmd/local/paths"
2120
"github.com/airbytehq/abctl/internal/common"
22-
"github.com/airbytehq/abctl/internal/telemetry"
21+
"github.com/airbytehq/abctl/internal/merge"
2322
"github.com/airbytehq/abctl/internal/trace"
2423
helmclient "github.com/mittwald/go-helm-client"
2524
"github.com/pterm/pterm"
@@ -36,11 +35,6 @@ import (
3635
)
3736

3837
const (
39-
// persistent volume constants, these are named to match the values given in the helm chart
40-
pvMinio = "airbyte-minio-pv"
41-
pvLocal = "airbyte-local-pv"
42-
pvPsql = "airbyte-volume-db"
43-
4438
// persistent volume claim constants, these are named to match the values given in the helm chart
4539
pvcMinio = "airbyte-minio-pv-claim-airbyte-minio-0"
4640
pvcLocal = "airbyte-storage-pvc"
@@ -52,10 +46,10 @@ type InstallOpts struct {
5246
HelmValuesYaml string
5347
AirbyteChartLoc string
5448
Secrets []string
55-
Migrate bool
5649
Hosts []string
5750
ExtraVolumeMounts []k8s.ExtraVolumeMount
5851
LocalStorage bool
52+
EnablePsql17 bool
5953

6054
DockerServer string
6155
DockerUser string
@@ -158,16 +152,23 @@ func (c *Command) persistentVolumeClaim(ctx context.Context, namespace, name, vo
158152

159153
// PrepImages determines the docker images needed by the chart, pulls them, and loads them into the cluster.
160154
// This is best effort, so errors are dropped here.
161-
func (c *Command) PrepImages(ctx context.Context, cluster k8s.Cluster, opts *InstallOpts) {
155+
func (c *Command) PrepImages(ctx context.Context, cluster k8s.Cluster, opts *InstallOpts, withImages ...string) {
162156
ctx, span := trace.NewSpan(ctx, "command.PrepImages")
163157
defer span.End()
164158

159+
for _, image := range withImages {
160+
pterm.Info.Printfln("Patching image %s", image)
161+
}
162+
165163
manifest, err := images.FindImagesFromChart(opts.HelmValuesYaml, opts.AirbyteChartLoc, opts.HelmChartVersion)
166164
if err != nil {
167165
pterm.Debug.Printfln("error building image manifest: %s", err)
168166
return
169167
}
170168

169+
// Merge images with the manifest.
170+
manifest = merge.DockerImages(manifest, withImages)
171+
171172
cluster.LoadImages(ctx, c.docker.Client, manifest)
172173
}
173174

@@ -192,40 +193,31 @@ func (c *Command) Install(ctx context.Context, opts *InstallOpts) error {
192193
pterm.Info.Printfln("Namespace '%s' already exists", common.AirbyteNamespace)
193194
}
194195

196+
// Storage volumes.
195197
if opts.LocalStorage {
196-
if err := c.persistentVolume(ctx, common.AirbyteNamespace, pvLocal); err != nil {
198+
if err := c.persistentVolume(ctx, common.AirbyteNamespace, paths.PvLocal); err != nil {
199+
return err
200+
}
201+
202+
if err := c.persistentVolumeClaim(ctx, common.AirbyteNamespace, pvcLocal, paths.PvLocal); err != nil {
197203
return err
198204
}
199205
} else {
200-
if err := c.persistentVolume(ctx, common.AirbyteNamespace, pvMinio); err != nil {
206+
if err := c.persistentVolume(ctx, common.AirbyteNamespace, paths.PvMinio); err != nil {
201207
return err
202208
}
203-
}
204209

205-
if err := c.persistentVolume(ctx, common.AirbyteNamespace, pvPsql); err != nil {
206-
return err
207-
}
208-
209-
span.SetAttributes(attribute.Bool("migrate", opts.Migrate))
210-
if opts.Migrate {
211-
c.spinner.UpdateText("Migrating airbyte data")
212-
if err := c.tel.Wrap(ctx, telemetry.Migrate, func() error { return migrate.FromDockerVolume(ctx, c.docker.Client, "airbyte_db") }); err != nil {
213-
pterm.Error.Println("Failed to migrate data from previous Airbyte installation")
214-
return fmt.Errorf("unable to migrate data from previous airbyte installation: %w", err)
210+
if err := c.persistentVolumeClaim(ctx, common.AirbyteNamespace, pvcMinio, paths.PvMinio); err != nil {
211+
return err
215212
}
216213
}
217214

218-
if opts.LocalStorage {
219-
if err := c.persistentVolumeClaim(ctx, common.AirbyteNamespace, pvcLocal, pvLocal); err != nil {
220-
return err
221-
}
222-
} else {
223-
if err := c.persistentVolumeClaim(ctx, common.AirbyteNamespace, pvcMinio, pvMinio); err != nil {
224-
return err
225-
}
215+
// PSQL volumes.
216+
if err := c.persistentVolume(ctx, common.AirbyteNamespace, paths.PvPsql); err != nil {
217+
return err
226218
}
227219

228-
if err := c.persistentVolumeClaim(ctx, common.AirbyteNamespace, pvcPsql, pvPsql); err != nil {
220+
if err := c.persistentVolumeClaim(ctx, common.AirbyteNamespace, pvcPsql, paths.PvPsql); err != nil {
229221
return err
230222
}
231223

internal/cmd/local/local/testdata/expected-default.values.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,6 @@ global:
1313
memory: 4Gi
1414
storage:
1515
type: local
16+
postgresql:
17+
image:
18+
tag: 1.7.0-17

internal/cmd/local/local_install.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ type InstallCmd struct {
2626
Host []string `help:"HTTP ingress host."`
2727
InsecureCookies bool `help:"Allow cookies to be served over HTTP."`
2828
LowResourceMode bool `help:"Run Airbyte in low resource mode."`
29-
Migrate bool `help:"Migrate data from a previous Docker Compose Airbyte installation."`
3029
NoBrowser bool `help:"Disable launching a browser post install."`
3130
Port int `default:"8000" help:"HTTP ingress port."`
3231
Secret []string `type:"existingfile" help:"An Airbyte helm chart secret file."`
@@ -60,13 +59,22 @@ func (i *InstallCmd) InstallOpts(ctx context.Context, user string) (*local.Insta
6059
pterm.Warning.Println("Found MinIO physical volume. Consider migrating it to local storage (see project docs)")
6160
}
6261

62+
enablePsql17, err := local.EnablePsql17()
63+
if err != nil {
64+
return nil, err
65+
}
66+
67+
if !enablePsql17 {
68+
pterm.Warning.Println("Psql 13 detected. Consider upgrading to 17")
69+
}
70+
6371
opts := &local.InstallOpts{
6472
HelmChartVersion: i.ChartVersion,
6573
AirbyteChartLoc: helm.LocateLatestAirbyteChart(i.ChartVersion, i.Chart),
6674
Secrets: i.Secret,
67-
Migrate: i.Migrate,
6875
Hosts: i.Host,
6976
LocalStorage: !supportMinio,
77+
EnablePsql17: enablePsql17,
7078
ExtraVolumeMounts: extraVolumeMounts,
7179
DockerServer: i.DockerServer,
7280
DockerUser: i.DockerUsername,
@@ -81,6 +89,7 @@ func (i *InstallCmd) InstallOpts(ctx context.Context, user string) (*local.Insta
8189
LowResourceMode: i.LowResourceMode,
8290
DisableAuth: i.DisableAuth,
8391
LocalStorage: !supportMinio,
92+
EnablePsql17: enablePsql17,
8493
}
8594

8695
if opts.DockerAuth() {
@@ -115,11 +124,18 @@ func (i *InstallCmd) Run(ctx context.Context, provider k8s.Provider, telClient t
115124
return fmt.Errorf("unable to determine docker installation status: %w", err)
116125
}
117126

127+
// Overrides Helm chart images.
128+
overrideImages := []string{}
129+
118130
opts, err := i.InstallOpts(ctx, telClient.User())
119131
if err != nil {
120132
return err
121133
}
122134

135+
if opts.EnablePsql17 {
136+
overrideImages = append(overrideImages, "airbyte/db:"+helm.Psql17AirbyteTag)
137+
}
138+
123139
return telClient.Wrap(ctx, telemetry.Install, func() error {
124140
spinner.UpdateText(fmt.Sprintf("Checking for existing Kubernetes cluster '%s'", provider.ClusterName))
125141

@@ -180,7 +196,7 @@ func (i *InstallCmd) Run(ctx context.Context, provider k8s.Provider, telClient t
180196
}
181197

182198
spinner.UpdateText("Pulling images")
183-
lc.PrepImages(ctx, cluster, opts)
199+
lc.PrepImages(ctx, cluster, opts, overrideImages...)
184200

185201
if err := lc.Install(ctx, opts); err != nil {
186202
spinner.Fail("Unable to install Airbyte locally")

internal/cmd/local/local_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ func TestInstallOpts(t *testing.T) {
158158
HelmValuesYaml: string(b),
159159
AirbyteChartLoc: "/test/path/to/chart",
160160
LocalStorage: true,
161+
EnablePsql17: true,
161162
}
162163
opts, err := cmd.InstallOpts(context.Background(), "test-user")
163164
if err != nil {

0 commit comments

Comments
 (0)