diff --git a/pkg/plugin/sdk/diff/diff.go b/pkg/plugin/sdk/diff/diff.go new file mode 100644 index 0000000000..1f373e0620 --- /dev/null +++ b/pkg/plugin/sdk/diff/diff.go @@ -0,0 +1,480 @@ +// Copyright 2024 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package diff + +import ( + "fmt" + "reflect" + "strconv" + "strings" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +type differ struct { + ignoreAddingMapKeys bool + equateEmpty bool + compareNumberAndNumericString bool + compareBooleanAndBooleanString bool + ignoredPaths map[string]struct{} + ignoreConfig map[string][]string + + result *Result +} + +type Option func(*differ) + +// WithIgnoreAddingMapKeys configures differ to ignore all map keys +// that were added to the second object and missing in the first one. +func WithIgnoreAddingMapKeys() Option { + return func(d *differ) { + d.ignoreAddingMapKeys = true + } +} + +// WithEquateEmpty configures differ to consider all maps/slides with a length of zero and nil to be equal. +func WithEquateEmpty() Option { + return func(d *differ) { + d.equateEmpty = true + } +} + +// WithCompareNumberAndNumericString configures differ to compare a number with a numeric string. +// Differ parses the string to number before comparing their values. +// e.g. 1.5 == "1.5" +func WithCompareNumberAndNumericString() Option { + return func(d *differ) { + d.compareNumberAndNumericString = true + } +} + +// WithCompareBooleanAndBooleanString configures differ to compare a boolean with a string. +// Differ parses the string to boolean before comparing their values. +// e.g. true == "true" +func WithCompareBooleanAndBooleanString() Option { + return func(d *differ) { + d.compareBooleanAndBooleanString = true + } +} + +// WithIgnoreConfig configures ignored fields. +func WithIgnoreConfig(config map[string][]string) Option { + return func(d *differ) { + d.ignoreConfig = config + } +} + +func (d *differ) initIgnoredPaths(key string) { + paths := d.ignoreConfig[key] + d.ignoredPaths = make(map[string]struct{}, len(paths)) + + for _, path := range paths { + d.ignoredPaths[path] = struct{}{} + } +} + +// DiffUnstructureds calculates the diff between two unstructured objects. +// If you compare non-k8s manifests, use DiffStructureds instead. +func DiffUnstructureds(x, y unstructured.Unstructured, key string, opts ...Option) (*Result, error) { + var ( + path = []PathStep{} + vx = reflect.ValueOf(x.Object) + vy = reflect.ValueOf(y.Object) + d = &differ{result: &Result{}} + ) + for _, opt := range opts { + opt(d) + } + + d.initIgnoredPaths(key) + + if err := d.diff(path, vx, vy); err != nil { + return nil, err + } + + d.result.sort() + return d.result, nil +} + +func (d *differ) diff(path []PathStep, vx, vy reflect.Value) error { + if !vx.IsValid() { + if d.equateEmpty && isEmptyInterface(vy) { + return nil + } + + d.addNode(path, nil, vy.Type(), vx, vy) + return nil + } + + if !vy.IsValid() { + if d.equateEmpty && isEmptyInterface(vx) { + return nil + } + + d.addNode(path, vx.Type(), nil, vx, vy) + return nil + } + + if isNumberValue(vx) && isNumberValue(vy) { + return d.diffNumber(path, vx, vy) + } + + if d.compareNumberAndNumericString { + if isNumberValue(vx) { + if y, ok := convertToNumber(vy); ok { + return d.diffNumber(path, vx, y) + } + } + + if isNumberValue(vy) { + if x, ok := convertToNumber(vx); ok { + return d.diffNumber(path, x, vy) + } + } + } + + if isBooleanValue(vx) && isBooleanValue(vy) { + return d.diffBool(path, vx, vy) + } + + if d.compareBooleanAndBooleanString { + if isBooleanValue(vx) { + if y, ok := convertToBoolean(vy); ok { + return d.diffBool(path, vx, y) + } + } + + if isBooleanValue(vy) { + if x, ok := convertToBoolean(vx); ok { + return d.diffBool(path, x, vy) + } + } + } + + if vx.Type() != vy.Type() { + d.addNode(path, vx.Type(), vy.Type(), vx, vy) + return nil + } + + switch vx.Kind() { + case reflect.Map: + return d.diffMap(path, vx, vy) + + case reflect.Slice, reflect.Array: + return d.diffSlice(path, vx, vy) + + case reflect.Interface: + return d.diffInterface(path, vx, vy) + + case reflect.String: + return d.diffString(path, vx, vy) + + case reflect.Bool: + return d.diffBool(path, vx, vy) + + default: + return fmt.Errorf("%v kind is not handled", vx.Kind()) + } +} + +func (d *differ) diffSlice(path []PathStep, vx, vy reflect.Value) error { + if vx.IsNil() || vy.IsNil() { + d.addNode(path, vx.Type(), vy.Type(), vx, vy) + return nil + } + + minLen := vx.Len() + if minLen > vy.Len() { + minLen = vy.Len() + } + + for i := 0; i < minLen; i++ { + nextPath := newSlicePath(path, i) + nextValueX := vx.Index(i) + nextValueY := vy.Index(i) + if err := d.diff(nextPath, nextValueX, nextValueY); err != nil { + return err + } + } + + for i := minLen; i < vx.Len(); i++ { + nextPath := newSlicePath(path, i) + nextValueX := vx.Index(i) + d.addNode(nextPath, nextValueX.Type(), nextValueX.Type(), nextValueX, reflect.Value{}) + } + + for i := minLen; i < vy.Len(); i++ { + nextPath := newSlicePath(path, i) + nextValueY := vy.Index(i) + d.addNode(nextPath, nextValueY.Type(), nextValueY.Type(), reflect.Value{}, nextValueY) + } + + return nil +} + +func (d *differ) diffMap(path []PathStep, vx, vy reflect.Value) error { + if vx.IsNil() || vy.IsNil() { + d.addNode(path, vx.Type(), vy.Type(), vx, vy) + return nil + } + + keys := append(vx.MapKeys(), vy.MapKeys()...) + checks := make(map[string]struct{}) + + for _, k := range keys { + if k.Kind() != reflect.String { + return fmt.Errorf("unsupport %v as key type of a map", k.Kind()) + } + if _, ok := checks[k.String()]; ok { + continue + } + + nextValueY := vy.MapIndex(k) + // Don't need to check the key existing in the first(LiveManifest) one but missing in the seccond(GitManifest) one + // when IgnoreAddingMapKeys is configured. + if d.ignoreAddingMapKeys && !nextValueY.IsValid() { + continue + } + + nextPath := newMapPath(path, k.String()) + nextValueX := vx.MapIndex(k) + checks[k.String()] = struct{}{} + if err := d.diff(nextPath, nextValueX, nextValueY); err != nil { + return err + } + } + return nil +} + +func (d *differ) diffInterface(path []PathStep, vx, vy reflect.Value) error { + if isEmptyInterface(vx) && isEmptyInterface(vy) { + return nil + } + + if vx.IsNil() || vy.IsNil() { + d.addNode(path, vx.Type(), vy.Type(), vx, vy) + return nil + } + + vx, vy = vx.Elem(), vy.Elem() + return d.diff(path, vx, vy) +} + +func (d *differ) diffString(path []PathStep, vx, vy reflect.Value) error { + if vx.String() == vy.String() { + return nil + } + d.addNode(path, vx.Type(), vy.Type(), vx, vy) + return nil +} + +func (d *differ) diffBool(path []PathStep, vx, vy reflect.Value) error { + if vx.Bool() == vy.Bool() { + return nil + } + d.addNode(path, vx.Type(), vy.Type(), vx, vy) + return nil +} + +func (d *differ) diffNumber(path []PathStep, vx, vy reflect.Value) error { + if floatNumber(vx) == floatNumber(vy) { + return nil + } + + d.addNode(path, vx.Type(), vy.Type(), vx, vy) + return nil +} + +// isEmptyInterface reports whether v is nil or zero value or its element is an empty map, an empty slice. +func isEmptyInterface(v reflect.Value) bool { + if !v.IsValid() || v.IsNil() || v.IsZero() { + return true + } + if v.Kind() != reflect.Interface { + return false + } + + e := v.Elem() + + // When the value that the interface v contains is a zero value (false boolean, zero number, empty string...). + if e.IsZero() { + return true + } + + switch e.Kind() { + case reflect.Array, reflect.Slice, reflect.Map: + return e.Len() == 0 + default: + return false + } +} + +func floatNumber(v reflect.Value) float64 { + switch v.Kind() { + case reflect.Float32, reflect.Float64: + return v.Float() + default: + return float64(v.Int()) + } +} + +func isNumberValue(v reflect.Value) bool { + switch v.Kind() { + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, reflect.Float32, reflect.Float64: + return true + default: + return false + } +} + +func convertToNumber(v reflect.Value) (reflect.Value, bool) { + switch v.Kind() { + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, reflect.Float32, reflect.Float64: + return v, true + case reflect.String: + if n, err := strconv.ParseFloat(v.String(), 64); err == nil { + return reflect.ValueOf(n), true + } + return v, false + default: + return v, false + } +} + +func isBooleanValue(v reflect.Value) bool { + return v.Kind() == reflect.Bool +} + +func convertToBoolean(v reflect.Value) (reflect.Value, bool) { + if v.Kind() == reflect.String { + b, err := strconv.ParseBool(v.String()) + return reflect.ValueOf(b), err == nil + } + return v, false +} + +func newSlicePath(path []PathStep, index int) []PathStep { + next := make([]PathStep, len(path)) + copy(next, path) + next = append(next, PathStep{ + Type: SliceIndexPathStep, + SliceIndex: index, + }) + return next +} + +func newMapPath(path []PathStep, index string) []PathStep { + next := make([]PathStep, len(path)) + copy(next, path) + next = append(next, PathStep{ + Type: MapIndexPathStep, + MapIndex: index, + }) + return next +} + +func (d *differ) addNode(path []PathStep, tx, ty reflect.Type, vx, vy reflect.Value) { + if len(d.ignoredPaths) > 0 { + pathString := makePathString(path) + if d.isIgnoredPath(pathString) { + return + } + nvx := d.ignoredValue(vx, pathString) + nvy := d.ignoredValue(vy, pathString) + + d.result.addNode(path, tx, ty, nvx, nvy) + return + } + + d.result.addNode(path, tx, ty, vx, vy) +} + +func (d *differ) ignoredValue(v reflect.Value, prefix string) reflect.Value { + switch v.Kind() { + case reflect.Map: + nv := reflect.MakeMap(v.Type()) + keys := v.MapKeys() + for _, k := range keys { + nprefix := prefix + "." + k.String() + if d.isIgnoredPath(nprefix) { + continue + } + + sub := v.MapIndex(k) + filtered := d.ignoredValue(sub, nprefix) + if !filtered.IsValid() { + continue + } + nv.SetMapIndex(k, filtered) + } + return nv + + case reflect.Slice, reflect.Array: + nv := reflect.MakeSlice(v.Type(), 0, 0) + for i := 0; i < v.Len(); i++ { + nprefix := prefix + "." + strconv.Itoa(i) + if d.isIgnoredPath(nprefix) { + continue + } + + filtered := d.ignoredValue(v.Index(i), nprefix) + if !filtered.IsValid() { + continue + } + nv = reflect.Append(nv, filtered) + } + return nv + + case reflect.Interface: + nprefix := prefix + "." + v.String() + if d.isIgnoredPath(nprefix) { + return reflect.New(v.Type()) + } + return d.ignoredValue(v.Elem(), prefix) + + case reflect.String: + return v + + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, + reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + return v + + case reflect.Float32, reflect.Float64: + return v + + default: + nprefix := prefix + "." + v.String() + if d.isIgnoredPath(nprefix) { + return reflect.New(v.Type()) + } + return v + } +} + +func (d *differ) isIgnoredPath(pathString string) bool { + var pathSubStr string + pathElms := strings.Split(pathString, ".") + + for i, path := range pathElms { + if i != 0 { + pathSubStr += "." + } + pathSubStr += path + if _, found := d.ignoredPaths[pathSubStr]; found { + return true + } + } + return false +} diff --git a/pkg/plugin/sdk/diff/diff_test.go b/pkg/plugin/sdk/diff/diff_test.go new file mode 100644 index 0000000000..f26816b9a0 --- /dev/null +++ b/pkg/plugin/sdk/diff/diff_test.go @@ -0,0 +1,322 @@ +// Copyright 2024 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package diff + +import ( + "os" + "reflect" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "sigs.k8s.io/yaml" +) + +func TestDiff(t *testing.T) { + testcases := []struct { + name string + yamlFile string + resourceKey string + options []Option + diffNum int + diffString string + }{ + { + name: "no diff", + yamlFile: "testdata/no_diff.yaml", + options: []Option{ + WithEquateEmpty(), + WithIgnoreAddingMapKeys(), + WithCompareNumberAndNumericString(), + WithCompareBooleanAndBooleanString(), + }, + diffNum: 0, + }, + { + name: "no diff by ignoring all adding map keys", + yamlFile: "testdata/ignore_adding_map_keys.yaml", + options: []Option{ + WithIgnoreAddingMapKeys(), + }, + diffNum: 0, + }, + { + name: "diff by ignoring specified field with correct key", + yamlFile: "testdata/has_diff.yaml", + resourceKey: "deployment-key", + options: []Option{ + WithIgnoreConfig( + map[string][]string{ + "deployment-key": { + "spec.replicas", + "spec.template.spec.containers.0.args.1", + "spec.template.spec.strategy.rollingUpdate.maxSurge", + "spec.template.spec.containers.3.livenessProbe.initialDelaySeconds", + }, + }, + ), + }, + diffNum: 6, + diffString: ` spec: + template: + metadata: + labels: + #spec.template.metadata.labels.app +- app: simple ++ app: simple2 + + #spec.template.metadata.labels.component +- component: foo + + spec: + containers: + - + #spec.template.spec.containers.1.image +- image: gcr.io/pipecd/helloworld:v2.0.0 ++ image: gcr.io/pipecd/helloworld:v2.1.0 + + - + #spec.template.spec.containers.2.image +- image: + + #spec.template.spec.containers.3 ++ - image: new-image ++ livenessProbe: ++ exec: ++ command: ++ - cat ++ - /tmp/healthy ++ name: foo + + #spec.template.spec.strategy ++ strategy: ++ rollingUpdate: ++ maxUnavailable: 25% ++ type: RollingUpdate + +`, + }, + { + name: "diff by ignoring specified field with wrong resource key", + yamlFile: "testdata/has_diff.yaml", + resourceKey: "deployment-key", + options: []Option{ + WithIgnoreConfig( + map[string][]string{ + "crd-key": { + "spec.replicas", + "spec.template.spec.containers.0.args.1", + "spec.template.spec.strategy.rollingUpdate.maxSurge", + "spec.template.spec.containers.3.livenessProbe.initialDelaySeconds", + }, + }, + ), + }, + diffNum: 8, + diffString: ` spec: + #spec.replicas +- replicas: 2 ++ replicas: 3 + + template: + metadata: + labels: + #spec.template.metadata.labels.app +- app: simple ++ app: simple2 + + #spec.template.metadata.labels.component +- component: foo + + spec: + containers: + - args: + #spec.template.spec.containers.0.args.1 +- - hello + + - + #spec.template.spec.containers.1.image +- image: gcr.io/pipecd/helloworld:v2.0.0 ++ image: gcr.io/pipecd/helloworld:v2.1.0 + + - + #spec.template.spec.containers.2.image +- image: + + #spec.template.spec.containers.3 ++ - image: new-image ++ livenessProbe: ++ exec: ++ command: ++ - cat ++ - /tmp/healthy ++ initialDelaySeconds: 5 ++ name: foo + + #spec.template.spec.strategy ++ strategy: ++ rollingUpdate: ++ maxSurge: 25% ++ maxUnavailable: 25% ++ type: RollingUpdate + +`, + }, + { + name: "has diff", + yamlFile: "testdata/has_diff.yaml", + diffNum: 8, + diffString: ` spec: + #spec.replicas +- replicas: 2 ++ replicas: 3 + + template: + metadata: + labels: + #spec.template.metadata.labels.app +- app: simple ++ app: simple2 + + #spec.template.metadata.labels.component +- component: foo + + spec: + containers: + - args: + #spec.template.spec.containers.0.args.1 +- - hello + + - + #spec.template.spec.containers.1.image +- image: gcr.io/pipecd/helloworld:v2.0.0 ++ image: gcr.io/pipecd/helloworld:v2.1.0 + + - + #spec.template.spec.containers.2.image +- image: + + #spec.template.spec.containers.3 ++ - image: new-image ++ livenessProbe: ++ exec: ++ command: ++ - cat ++ - /tmp/healthy ++ initialDelaySeconds: 5 ++ name: foo + + #spec.template.spec.strategy ++ strategy: ++ rollingUpdate: ++ maxSurge: 25% ++ maxUnavailable: 25% ++ type: RollingUpdate + +`, + }, + } + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + objs, err := loadUnstructureds(tc.yamlFile) + require.NoError(t, err) + require.Equal(t, 2, len(objs)) + + result, err := DiffUnstructureds(objs[0], objs[1], tc.resourceKey, tc.options...) + require.NoError(t, err) + assert.Equal(t, tc.diffNum, result.NumNodes()) + + renderer := NewRenderer(WithLeftPadding(1)) + ds := renderer.Render(result.Nodes()) + + assert.Equal(t, tc.diffString, ds) + }) + } +} + +func loadUnstructureds(path string) ([]unstructured.Unstructured, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, err + } + + const separator = "\n---" + parts := strings.Split(string(data), separator) + out := make([]unstructured.Unstructured, 0, len(parts)) + + for _, part := range parts { + // Ignore all the cases where no content between separator. + part = strings.TrimSpace(part) + if len(part) == 0 { + continue + } + var obj unstructured.Unstructured + if err := yaml.Unmarshal([]byte(part), &obj); err != nil { + return nil, err + } + out = append(out, obj) + } + return out, nil +} + +func TestIsEmptyInterface(t *testing.T) { + testcases := []struct { + name string + v interface{} + expected bool + }{ + { + name: "nil", + v: nil, + expected: true, + }, + { + name: "nil map", + v: map[string]int(nil), + expected: true, + }, + { + name: "empty map", + v: map[string]int{}, + expected: true, + }, + { + name: "nil slice", + v: []int(nil), + expected: true, + }, + { + name: "empty slice", + v: []int{}, + expected: true, + }, + { + name: "number", + v: 1, + expected: false, + }, + } + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + s := []interface{}{tc.v} + v := reflect.ValueOf(s) + + got := isEmptyInterface(v.Index(0)) + assert.Equal(t, tc.expected, got) + }) + } +} diff --git a/pkg/plugin/sdk/diff/renderer.go b/pkg/plugin/sdk/diff/renderer.go new file mode 100644 index 0000000000..a5655aacea --- /dev/null +++ b/pkg/plugin/sdk/diff/renderer.go @@ -0,0 +1,230 @@ +// Copyright 2024 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package diff + +import ( + "fmt" + "reflect" + "sort" + "strconv" + "strings" +) + +type Renderer struct { + leftPadding int + maskPathPrefix string +} + +type RenderOption func(*Renderer) + +const maskString = "*****" + +func WithLeftPadding(p int) RenderOption { + return func(r *Renderer) { + r.leftPadding = p + } +} + +func WithMaskPath(prefix string) RenderOption { + return func(r *Renderer) { + r.maskPathPrefix = prefix + } +} + +func NewRenderer(opts ...RenderOption) *Renderer { + r := &Renderer{} + for _, opt := range opts { + opt(r) + } + return r +} + +func (r *Renderer) Render(ns Nodes) string { + if len(ns) == 0 { + return "" + } + + var prePath []PathStep + var b strings.Builder + + printValue := func(mark string, v reflect.Value, lastStep PathStep, depth int) { + if !v.IsValid() { + return + } + + nodeString, nl := renderNodeValue(v, "") + if lastStep.Type == SliceIndexPathStep { + nl = false + } + + switch { + case lastStep.Type == SliceIndexPathStep: + b.WriteString(fmt.Sprintf("%s%*s- ", mark, depth*2-1, "")) + case nl: + b.WriteString(fmt.Sprintf("%s%*s%s:\n", mark, depth*2-1, "", lastStep.String())) + default: + b.WriteString(fmt.Sprintf("%s%*s%s: ", mark, depth*2-1, "", lastStep.String())) + } + + parts := strings.Split(nodeString, "\n") + for i, p := range parts { + if lastStep.Type != SliceIndexPathStep { + if nl { + b.WriteString(fmt.Sprintf("%s%*s%s\n", mark, depth*2+1, "", p)) + } else { + b.WriteString(fmt.Sprintf("%s\n", p)) + } + continue + } + if i == 0 { + b.WriteString(fmt.Sprintf("%s\n", p)) + continue + } + b.WriteString(fmt.Sprintf("%s%*s%s\n", mark, depth*2+1, "", p)) + } + } + + for _, n := range ns { + duplicateDepth := pathDuplicateDepth(n.Path, prePath) + prePath = n.Path + pathLen := len(n.Path) + + var array bool + for i := duplicateDepth; i < pathLen-1; i++ { + if n.Path[i].Type == SliceIndexPathStep { + b.WriteString(fmt.Sprintf("%*s-", (r.leftPadding+i)*2, "")) + array = true + continue + } + if array { + b.WriteString(fmt.Sprintf(" %s:\n", n.Path[i].String())) + array = false + continue + } + b.WriteString(fmt.Sprintf("%*s%s:\n", (r.leftPadding+i)*2, "", n.Path[i].String())) + } + if array { + b.WriteString("\n") + } + + lastStep := n.Path[pathLen-1] + valueX, valueY := n.ValueX, n.ValueY + if r.maskPathPrefix != "" && strings.HasPrefix(n.PathString, r.maskPathPrefix) { + valueX = reflect.ValueOf(maskString) + valueY = reflect.ValueOf(maskString) + } + + b.WriteString(fmt.Sprintf("%*s#%s\n", (r.leftPadding+pathLen-1)*2, "", n.PathString)) + printValue("-", valueX, lastStep, r.leftPadding+pathLen-1) + printValue("+", valueY, lastStep, r.leftPadding+pathLen-1) + b.WriteString("\n") + } + + return b.String() +} + +func pathDuplicateDepth(x, y []PathStep) int { + minLen := len(x) + if minLen > len(y) { + minLen = len(y) + } + + for i := 0; i < minLen; i++ { + if x[i] == y[i] { + continue + } + return i + } + return 0 +} + +func renderNodeValue(v reflect.Value, prefix string) (string, bool) { + switch v.Kind() { + case reflect.Map: + out := make([]string, 0, v.Len()) + keys := v.MapKeys() + sort.Slice(keys, func(i, j int) bool { + return keys[i].String() < keys[j].String() + }) + for _, k := range keys { + sub := v.MapIndex(k) + subString, nl := renderNodeValue(sub, prefix+" ") + if !nl { + out = append(out, fmt.Sprintf("%s%s: %s", prefix, k.String(), subString)) + continue + } + out = append(out, fmt.Sprintf("%s%s:\n%s", prefix, k.String(), subString)) + } + if len(out) == 0 { + return "", false + } + return strings.Join(out, "\n"), true + + case reflect.Slice, reflect.Array: + out := make([]string, 0, v.Len()) + for i := 0; i < v.Len(); i++ { + sub, _ := renderNodeValue(v.Index(i), prefix+" ") + parts := strings.Split(sub, "\n") + for i, p := range parts { + p = strings.TrimPrefix(p, prefix+" ") + if i == 0 { + out = append(out, fmt.Sprintf("%s- %s", prefix, p)) + continue + } + out = append(out, fmt.Sprintf("%s %s", prefix, p)) + } + } + return strings.Join(out, "\n"), true + + case reflect.Interface: + return renderNodeValue(v.Elem(), prefix) + + case reflect.String: + return v.String(), false + + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, + reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + return strconv.FormatInt(v.Int(), 10), false + + case reflect.Float32, reflect.Float64: + return strconv.FormatFloat(v.Float(), 'f', -1, 64), false + + case reflect.Bool: + return strconv.FormatBool(v.Bool()), false + + default: + return v.String(), false + } +} + +func RenderPrimitiveValue(v reflect.Value) string { + switch v.Kind() { + case reflect.String: + return v.String() + + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, + reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + return strconv.FormatInt(v.Int(), 10) + + case reflect.Float32, reflect.Float64: + return strconv.FormatFloat(v.Float(), 'f', -1, 64) + + case reflect.Interface, reflect.Pointer: + return RenderPrimitiveValue(v.Elem()) + + default: + return v.String() + } +} diff --git a/pkg/plugin/sdk/diff/renderer_test.go b/pkg/plugin/sdk/diff/renderer_test.go new file mode 100644 index 0000000000..859d125653 --- /dev/null +++ b/pkg/plugin/sdk/diff/renderer_test.go @@ -0,0 +1,234 @@ +// Copyright 2024 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package diff + +import ( + "os" + "reflect" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRenderNodeValue(t *testing.T) { + var ( + mapOfPrimative = map[string]string{ + "one": "1", + "two": "2", + } + mapOfMap = map[string]interface{}{ + "one": map[string]string{ + "one": "1-1", + "two": "1-2", + }, + "two": map[string]string{ + "one": "2-1", + "two": "2-2", + }, + } + mapOfSlice = map[string]interface{}{ + "one": []string{"one-1", "one-2"}, + "two": []string{"two-1", "two-2"}, + } + mapOfBool = map[string]interface{}{ + "false": false, + "true": true, + } + ) + + testcases := []struct { + name string + value reflect.Value + expected string + }{ + { + name: "int value", + value: reflect.ValueOf(1), + expected: "1", + }, + { + name: "float value", + value: reflect.ValueOf(1.25), + expected: "1.25", + }, + { + name: "string value", + value: reflect.ValueOf("hello"), + expected: "hello", + }, + { + name: "bool value (true)", + value: reflect.ValueOf(true), + expected: "true", + }, + { + name: "bool value (false)", + value: reflect.ValueOf(false), + expected: "false", + }, + { + name: "slice of primitive elements", + value: func() reflect.Value { + v := []int{1, 2, 3} + return reflect.ValueOf(v) + }(), + expected: `- 1 +- 2 +- 3`, + }, + { + name: "slice of interface", + value: func() reflect.Value { + v := []interface{}{ + map[string]int{ + "1-one": 1, + "2-two": 2, + }, + map[string]int{ + "3-three": 3, + "4-four": 4, + }, + } + return reflect.ValueOf(v) + }(), + expected: `- 1-one: 1 + 2-two: 2 +- 3-three: 3 + 4-four: 4`, + }, + { + name: "simple map", + value: reflect.ValueOf(map[string]string{ + "one": "one-value", + "two": "two-value", + }), + expected: `one: one-value +two: two-value`, + }, + { + name: "nested map", + value: func() reflect.Value { + v := map[string]interface{}{ + "1-number": 1, + "2-string": "hello", + "3-map-of-primitive": mapOfPrimative, + "4-map-of-map": mapOfMap, + "5-map-of-slice": mapOfSlice, + "6-slice": []string{"a", "b"}, + "7-string": "hi", + } + return reflect.ValueOf(v) + }(), + expected: `1-number: 1 +2-string: hello +3-map-of-primitive: + one: 1 + two: 2 +4-map-of-map: + one: + one: 1-1 + two: 1-2 + two: + one: 2-1 + two: 2-2 +5-map-of-slice: + one: + - one-1 + - one-2 + two: + - two-1 + - two-2 +6-slice: + - a + - b +7-string: hi`, + }, + { + name: "map of bool", + value: reflect.ValueOf(mapOfBool), + expected: `false: false +true: true`, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + got, _ := renderNodeValue(tc.value, "") + assert.Equal(t, tc.expected, got) + }) + } +} + +func TestRenderNodeValueComplex(t *testing.T) { + // Complex node. Note that the keys in the yaml file must be in order. + objs, err := loadUnstructureds("testdata/complex-node.yaml") + require.NoError(t, err) + require.Equal(t, 1, len(objs)) + + root := reflect.ValueOf(objs[0].Object) + got, _ := renderNodeValue(root, "") + + data, err := os.ReadFile("testdata/complex-node.yaml") + require.NoError(t, err) + assert.Equal(t, string(data), got) +} + +func TestRenderPrimitiveValue(t *testing.T) { + testcases := []struct { + name string + value interface{} + expected string + }{ + { + name: "string", + value: "hello", + expected: "hello", + }, + { + name: "int", + value: 1, + expected: "1", + }, + { + name: "float", + value: 1.25, + expected: "1.25", + }, + { + name: "pointer to int", + value: func() *int { + v := 1 + return &v + }(), + expected: "1", + }, + { + name: "map", + value: map[string]int{ + "one": 1, + }, + expected: "", + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + v := reflect.ValueOf(tc.value) + got := RenderPrimitiveValue(v) + assert.Equal(t, tc.expected, got) + }) + } +} diff --git a/pkg/plugin/sdk/diff/result.go b/pkg/plugin/sdk/diff/result.go new file mode 100644 index 0000000000..c3c0623a77 --- /dev/null +++ b/pkg/plugin/sdk/diff/result.go @@ -0,0 +1,155 @@ +// Copyright 2024 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package diff + +import ( + "errors" + "reflect" + "regexp" + "sort" + "strconv" + "strings" +) + +var ( + ErrNotFound = errors.New("not found") +) + +type Result struct { + nodes []Node +} + +func (r *Result) HasDiff() bool { + return len(r.nodes) > 0 +} + +func (r *Result) NumNodes() int { + return len(r.nodes) +} + +func (r *Result) Nodes() Nodes { + return r.nodes +} + +type Node struct { + Path []PathStep + PathString string + TypeX reflect.Type + TypeY reflect.Type + ValueX reflect.Value + ValueY reflect.Value +} + +type PathStepType string + +const ( + MapIndexPathStep PathStepType = "MapIndex" + SliceIndexPathStep PathStepType = "SliceIndex" +) + +type PathStep struct { + Type PathStepType + MapIndex string + SliceIndex int +} + +func (s PathStep) String() string { + switch s.Type { + case SliceIndexPathStep: + return strconv.FormatInt(int64(s.SliceIndex), 10) + case MapIndexPathStep: + return s.MapIndex + default: + return "" + } +} + +func (n Node) StringX() string { + return RenderPrimitiveValue(n.ValueX) +} + +func (n Node) StringY() string { + return RenderPrimitiveValue(n.ValueY) +} + +type Nodes []Node + +func (ns Nodes) FindOne(query string) (*Node, error) { + reg, err := regexp.Compile(query) + if err != nil { + return nil, err + } + + for i := range ns { + matched := reg.MatchString(ns[i].PathString) + if !matched { + continue + } + return &ns[i], nil + } + return nil, ErrNotFound +} + +func (ns Nodes) Find(query string) (Nodes, error) { + reg, err := regexp.Compile(query) + if err != nil { + return nil, err + } + + nodes := make([]Node, 0) + for i := range ns { + matched := reg.MatchString(ns[i].PathString) + if !matched { + continue + } + nodes = append(nodes, ns[i]) + } + return nodes, nil +} + +func (ns Nodes) FindByPrefix(prefix string) Nodes { + nodes := make([]Node, 0) + for i := range ns { + if strings.HasPrefix(ns[i].PathString, prefix) { + nodes = append(nodes, ns[i]) + } + } + return nodes +} + +func (r *Result) addNode(path []PathStep, typeX, typeY reflect.Type, valueX, valueY reflect.Value) { + r.nodes = append(r.nodes, Node{ + Path: path, + PathString: makePathString(path), + TypeX: typeX, + TypeY: typeY, + ValueX: valueX, + ValueY: valueY, + }) +} + +func (r *Result) sort() { + sort.Slice(r.nodes, func(i, j int) bool { + return r.nodes[i].PathString < r.nodes[j].PathString + }) +} + +func makePathString(path []PathStep) string { + steps := make([]string, 0, len(path)) + for _, s := range path { + steps = append(steps, s.String()) + } + return strings.Join(steps, ".") +} diff --git a/pkg/plugin/sdk/diff/result_test.go b/pkg/plugin/sdk/diff/result_test.go new file mode 100644 index 0000000000..c42eeb3578 --- /dev/null +++ b/pkg/plugin/sdk/diff/result_test.go @@ -0,0 +1,102 @@ +// Copyright 2024 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package diff + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestFindOne(t *testing.T) { + nodes := []Node{ + {PathString: "spec.template.spec"}, + } + + testcases := []struct { + name string + nodes Nodes + query string + expected *Node + exepectedError error + }{ + { + name: "nil list", + query: ".+", + exepectedError: ErrNotFound, + }, + { + name: "not found", + nodes: nodes, + query: `spec\.not-found\..+`, + exepectedError: ErrNotFound, + }, + { + name: "found", + nodes: nodes, + query: `spec\.template\..+`, + expected: &nodes[0], + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + n, err := tc.nodes.FindOne(tc.query) + assert.Equal(t, tc.expected, n) + assert.Equal(t, tc.exepectedError, err) + }) + } +} + +func TestFind(t *testing.T) { + nodes := []Node{ + {PathString: "spec.replicas"}, + {PathString: "spec.template.spec.containers.0.image"}, + {PathString: "spec.template.spec.containers.1.image"}, + } + + testcases := []struct { + name string + nodes Nodes + query string + expected []Node + }{ + { + name: "nil list", + query: ".+", + expected: []Node{}, + }, + { + name: "not found", + nodes: nodes, + query: `spec\.not-found\..+`, + expected: []Node{}, + }, + { + name: "found two nodes", + nodes: nodes, + query: `spec\.template\.spec\.containers\.\d+.image$`, + expected: []Node{nodes[1], nodes[2]}, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + ns, err := tc.nodes.Find(tc.query) + assert.Equal(t, Nodes(tc.expected), ns) + assert.NoError(t, err) + }) + } +} diff --git a/pkg/plugin/sdk/diff/testdata/complex-node.yaml b/pkg/plugin/sdk/diff/testdata/complex-node.yaml new file mode 100644 index 0000000000..a263b81a1c --- /dev/null +++ b/pkg/plugin/sdk/diff/testdata/complex-node.yaml @@ -0,0 +1,33 @@ +apiVersion: apps/v1 +kind: Foo +metadata: + labels: + app: simple + pipecd.dev/managed-by: piped + name: simple +spec: + replicas: 2 + selector: + matchLabels: + app: simple + template: + metadata: + labels: + app: simple + spec: + containers: + - args: + - hi + - hello + image: gcr.io/pipecd/helloworld:v1.0.0 + name: helloworld + ports: + - containerPort: 9085 + - image: envoy:1.10.0 + livenessProbe: + exec: + command: + - cat + - /tmp/healthy + initialDelaySeconds: 5 + name: envoy \ No newline at end of file diff --git a/pkg/plugin/sdk/diff/testdata/has_diff.yaml b/pkg/plugin/sdk/diff/testdata/has_diff.yaml new file mode 100644 index 0000000000..4b18bdb355 --- /dev/null +++ b/pkg/plugin/sdk/diff/testdata/has_diff.yaml @@ -0,0 +1,71 @@ +apiVersion: apps/v1 +kind: Foo +metadata: + name: simple + labels: + app: simple + pipecd.dev/managed-by: piped +spec: + replicas: 2 + selector: + matchLabels: + app: simple + template: + metadata: + labels: + app: simple + component: foo + spec: + containers: + - name: helloworld + image: gcr.io/pipecd/helloworld:v1.0.0 + args: + - hi + - hello + ports: + - containerPort: 9085 + - name: bar + image: gcr.io/pipecd/helloworld:v2.0.0 + - name: helloword2 + image: "" +--- +apiVersion: apps/v1 +kind: Foo +metadata: + name: simple + labels: + pipecd.dev/managed-by: piped + app: simple +spec: + replicas: 3 + selector: + matchLabels: + app: simple + template: + metadata: + labels: + app: simple2 + spec: + strategy: + rollingUpdate: + maxSurge: 25% + maxUnavailable: 25% + type: RollingUpdate + containers: + - name: helloworld + image: gcr.io/pipecd/helloworld:v1.0.0 + args: + - hi + ports: + - containerPort: 9085 + - name: bar + image: gcr.io/pipecd/helloworld:v2.1.0 + - name: helloword2 + - name: foo + image: new-image + livenessProbe: + exec: + command: + - cat + - /tmp/healthy + initialDelaySeconds: 5 diff --git a/pkg/plugin/sdk/diff/testdata/ignore_adding_map_keys.yaml b/pkg/plugin/sdk/diff/testdata/ignore_adding_map_keys.yaml new file mode 100644 index 0000000000..b618e40768 --- /dev/null +++ b/pkg/plugin/sdk/diff/testdata/ignore_adding_map_keys.yaml @@ -0,0 +1,58 @@ +apiVersion: apps/v1 +kind: Foo +metadata: + name: simple + labels: + app: simple + pipecd.dev/managed-by: piped + pipecd.dev/resource-key: apps/v1:Foo:default:simple + pipecd.dev/variant: primary +spec: + replicas: 2 + selector: + matchLabels: + app: simple + template: + metadata: + labels: + app: simple + spec: + newSliceFields: + - a + - b + containers: + - name: helloworld + image: gcr.io/pipecd/helloworld:v1.0.0 + args: + - hi + - hello + ports: + - containerPort: 9085 +status: + desc: ok +--- +apiVersion: apps/v1 +kind: Foo +metadata: + name: simple + labels: + pipecd.dev/managed-by: piped + app: simple +spec: + replicas: 2 + selector: + matchLabels: + app: simple + template: + metadata: + labels: + app: simple + spec: + containers: + - name: helloworld + image: gcr.io/pipecd/helloworld:v1.0.0 + args: + - hi + - hello + ports: + - containerPort: 9085 diff --git a/pkg/plugin/sdk/diff/testdata/no_diff.yaml b/pkg/plugin/sdk/diff/testdata/no_diff.yaml new file mode 100644 index 0000000000..6e2fd93610 --- /dev/null +++ b/pkg/plugin/sdk/diff/testdata/no_diff.yaml @@ -0,0 +1,92 @@ +apiVersion: apps/v1 +kind: Foo +metadata: + name: simple + labels: + app: simple + pipecd.dev/managed-by: piped + zeroBool1: false + zeroString1: "" + zeroInt1: 0 + zeroFloat1: 0.0 + booleanString1: "true" + booleanString2: true + booleanString3: "false" + booleanString4: false +spec: + replicas: 2 + number: 1 + selector: + matchLabels: + app: simple + template: + metadata: + labels: + app: simple + spec: + containers: + - name: helloworld + image: gcr.io/pipecd/helloworld:v1.0.0 + args: + - hi + - hello + ports: + - containerPort: 9085 + # Zero map and nil map should be equal. + resources: + null + emptyList: + [] + emptyMap: {} + resources: + limits: + cpu: 1 + memory: 1Gi + requests: + cpu: 1 + memory: 1Gi +--- +apiVersion: apps/v1 +kind: Foo +metadata: + name: simple + labels: + pipecd.dev/managed-by: piped + app: simple + zeroBool2: false + zeroString2: "" + zeroInt2: 0 + zeroFloat2: 0.0 + booleanString1: true + booleanString2: "true" + booleanString3: false + booleanString4: "false" +spec: + replicas: 2 + number: 1.0 + selector: + matchLabels: + app: simple + template: + metadata: + labels: + app: simple + spec: + containers: + - name: helloworld + image: gcr.io/pipecd/helloworld:v1.0.0 + args: + - hi + - hello + ports: + - containerPort: 9085 + # Zero map and nil map should be equal. + resources: {} + emptyList2: [] + resources: + limits: + cpu: "1" + memory: 1Gi + requests: + cpu: "1" + memory: 1Gi diff --git a/pkg/plugin/sdk/logpersister/logpersistertest/logpersister.go b/pkg/plugin/sdk/logpersister/logpersistertest/logpersister.go new file mode 100644 index 0000000000..0da67a74e8 --- /dev/null +++ b/pkg/plugin/sdk/logpersister/logpersistertest/logpersister.go @@ -0,0 +1,64 @@ +// Copyright 2024 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logpersistertest + +import ( + "testing" + "time" + + "github.com/pipe-cd/pipecd/pkg/plugin/logpersister" +) + +// NewTestLogPersister creates a new testLogPersister for testing. +func NewTestLogPersister(t *testing.T) TestLogPersister { + return TestLogPersister{t} +} + +// TestLogPersister implements logpersister for testing. +type TestLogPersister struct { + t *testing.T +} + +func (lp TestLogPersister) StageLogPersister(deploymentID, stageID string) logpersister.StageLogPersister { + return lp +} + +func (lp TestLogPersister) Write(log []byte) (int, error) { + // Write the log to the test logger + lp.t.Log(string(log)) + return 0, nil +} +func (lp TestLogPersister) Info(log string) { + lp.t.Log("INFO", log) +} +func (lp TestLogPersister) Infof(format string, a ...interface{}) { + lp.t.Logf("INFO "+format, a...) +} +func (lp TestLogPersister) Success(log string) { + lp.t.Log("SUCCESS", log) +} +func (lp TestLogPersister) Successf(format string, a ...interface{}) { + lp.t.Logf("SUCCESS "+format, a...) +} +func (lp TestLogPersister) Error(log string) { + lp.t.Log("ERROR", log) +} +func (lp TestLogPersister) Errorf(format string, a ...interface{}) { + lp.t.Logf("ERROR "+format, a...) +} +func (lp TestLogPersister) Complete(timeout time.Duration) error { + lp.t.Logf("Complete stage log persister with timeout: %v", timeout) + return nil +} diff --git a/pkg/plugin/sdk/logpersister/persister.go b/pkg/plugin/sdk/logpersister/persister.go new file mode 100644 index 0000000000..86234d979d --- /dev/null +++ b/pkg/plugin/sdk/logpersister/persister.go @@ -0,0 +1,210 @@ +// Copyright 2024 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package logpersister provides a piped component +// that enqueues all log blocks from running stages +// and then periodically sends to the control plane. +package logpersister + +import ( + "context" + "fmt" + "sync" + "time" + + "go.uber.org/zap" + "google.golang.org/grpc" + + "github.com/pipe-cd/pipecd/pkg/model" + service "github.com/pipe-cd/pipecd/pkg/plugin/pipedservice" +) + +type apiClient interface { + ReportStageLogs(ctx context.Context, in *service.ReportStageLogsRequest, opts ...grpc.CallOption) (*service.ReportStageLogsResponse, error) + ReportStageLogsFromLastCheckpoint(ctx context.Context, in *service.ReportStageLogsFromLastCheckpointRequest, opts ...grpc.CallOption) (*service.ReportStageLogsFromLastCheckpointResponse, error) +} + +// Persister is responsible for saving the stage logs into server's storage. +// It sends the logs to the piped plugin-api grpc server through the apiClient. +type Persister interface { + Run(ctx context.Context) error + StageLogPersister(deploymentID, stageID string) StageLogPersister +} + +// StageLogPersister is a child persister instance for a specific stage. +// Use this to persist the stage logs and make it viewable on the UI. +type StageLogPersister interface { + Write(log []byte) (int, error) + Info(log string) + Infof(format string, a ...interface{}) + Success(log string) + Successf(format string, a ...interface{}) + Error(log string) + Errorf(format string, a ...interface{}) + Complete(timeout time.Duration) error +} + +type key struct { + DeploymentID string + StageID string +} + +type persister struct { + apiClient apiClient + stagePersisters sync.Map + + flushInterval time.Duration + checkpointFlushInterval time.Duration + stalePeriod time.Duration + gracePeriod time.Duration + logger *zap.Logger +} + +// NewPersister creates a new persister instance for saving the stage logs into server's storage. +// This controls how many concurent api calls should be executed and when to flush the logs. +func NewPersister(apiClient apiClient, logger *zap.Logger) *persister { + return &persister{ + apiClient: apiClient, + flushInterval: 5 * time.Second, + checkpointFlushInterval: 2 * time.Minute, + stalePeriod: time.Minute, + gracePeriod: 30 * time.Second, + logger: logger.Named("log-persister"), + } +} + +// Run starts running workers to flush logs to server. +func (p *persister) Run(ctx context.Context) error { + p.logger.Info("start running log persister") + ticker := time.NewTicker(p.flushInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + p.flush(ctx) + + case <-ctx.Done(): + p.shutdown() + return nil + } + } +} + +func (p *persister) shutdown() { + p.logger.Info("flush all logs before stopping") + ctx, cancel := context.WithTimeout(context.Background(), p.gracePeriod) + defer cancel() + p.flushAll(ctx) + + p.logger.Info("log persister has been stopped") +} + +// StageLogPersister creates a child persister instance for a specific stage. +func (p *persister) StageLogPersister(deploymentID, stageID string) StageLogPersister { + k := key{ + DeploymentID: deploymentID, + StageID: stageID, + } + logger := p.logger.With( + zap.String("deployment-id", deploymentID), + zap.String("stage-id", stageID), + ) + sp := &stageLogPersister{ + key: k, + curLogIndex: time.Now().Unix(), + doneCh: make(chan struct{}), + checkpointFlushInterval: p.checkpointFlushInterval, + persister: p, + logger: logger, + } + + p.stagePersisters.Store(k, sp) + return sp +} + +func (p *persister) flush(ctx context.Context) (flushes, deletes int) { + completedKeys := make([]key, 0) + + // Check new log entries and flush them if needed. + p.stagePersisters.Range(func(_, v interface{}) bool { + sp := v.(*stageLogPersister) + + if sp.isStale(p.stalePeriod) { + completedKeys = append(completedKeys, sp.key) + deletes++ + return true + } + + sp.flush(ctx) + flushes++ + return true + }) + + // Clean up all completed stage persisters. + for _, k := range completedKeys { + p.stagePersisters.Delete(k) + } + + return +} + +func (p *persister) flushAll(ctx context.Context) int { + var num = 0 + + p.stagePersisters.Range(func(_, v interface{}) bool { + sp := v.(*stageLogPersister) + if !sp.isStale(p.stalePeriod) { + num++ + go sp.flushFromLastCheckpoint(ctx) + } + return true + }) + + p.logger.Info(fmt.Sprintf("flushing all of %d stage persisters", num)) + return num +} + +func (p *persister) reportStageLogs(ctx context.Context, k key, blocks []*model.LogBlock) error { + req := &service.ReportStageLogsRequest{ + DeploymentId: k.DeploymentID, + StageId: k.StageID, + Blocks: blocks, + } + if _, err := p.apiClient.ReportStageLogs(ctx, req); err != nil { + p.logger.Error("failed to report stage logs", + zap.Any("key", k), + zap.Error(err), + ) + return err + } + return nil +} + +func (p *persister) reportStageLogsFromLastCheckpoint(ctx context.Context, k key, blocks []*model.LogBlock, completed bool) error { + req := &service.ReportStageLogsFromLastCheckpointRequest{ + DeploymentId: k.DeploymentID, + StageId: k.StageID, + Blocks: blocks, + Completed: completed, + } + if _, err := p.apiClient.ReportStageLogsFromLastCheckpoint(ctx, req); err != nil { + p.logger.Error("failed to report stage logs from last checkpoint", + zap.Any("key", k), + zap.Error(err), + ) + return err + } + return nil +} diff --git a/pkg/plugin/sdk/logpersister/persister_test.go b/pkg/plugin/sdk/logpersister/persister_test.go new file mode 100644 index 0000000000..73d4bb4caf --- /dev/null +++ b/pkg/plugin/sdk/logpersister/persister_test.go @@ -0,0 +1,91 @@ +// Copyright 2024 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logpersister + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + "go.uber.org/zap" + "google.golang.org/grpc" + + service "github.com/pipe-cd/pipecd/pkg/plugin/pipedservice" +) + +type fakeAPIClient struct { + reportStageLogsCount atomic.Uint32 + reportStageLogsFromLastCheckpointCount atomic.Uint32 +} + +func (c *fakeAPIClient) ReportStageLogs(ctx context.Context, in *service.ReportStageLogsRequest, opts ...grpc.CallOption) (*service.ReportStageLogsResponse, error) { + c.reportStageLogsCount.Inc() + return &service.ReportStageLogsResponse{}, nil +} + +func (c *fakeAPIClient) ReportStageLogsFromLastCheckpoint(ctx context.Context, in *service.ReportStageLogsFromLastCheckpointRequest, opts ...grpc.CallOption) (*service.ReportStageLogsFromLastCheckpointResponse, error) { + c.reportStageLogsFromLastCheckpointCount.Inc() + return &service.ReportStageLogsFromLastCheckpointResponse{}, nil +} + +func (c *fakeAPIClient) NumberOfReportStageLogs() int { + return int(c.reportStageLogsCount.Load()) +} + +func (c *fakeAPIClient) NumberOfReportStageLogsFromLastCheckpoint() int { + return int(c.reportStageLogsFromLastCheckpointCount.Load()) +} + +func TestPersister(t *testing.T) { + t.Parallel() + + apiClient := &fakeAPIClient{} + p := NewPersister(apiClient, zap.NewNop()) + p.stalePeriod = 0 + + flushes, deletes := p.flush(context.TODO()) + require.Equal(t, 0, apiClient.NumberOfReportStageLogs()) + require.Equal(t, 0, apiClient.NumberOfReportStageLogsFromLastCheckpoint()) + assert.Equal(t, 0, flushes) + assert.Equal(t, 0, deletes) + + num := p.flushAll(context.TODO()) + require.Equal(t, 0, apiClient.NumberOfReportStageLogs()) + require.Equal(t, 0, apiClient.NumberOfReportStageLogsFromLastCheckpoint()) + assert.Equal(t, 0, num) + + sp1 := p.StageLogPersister("deployment-1", "stage-1") + p.StageLogPersister("deployment-2", "stage-2") + + num = p.flushAll(context.TODO()) + require.Equal(t, 0, apiClient.NumberOfReportStageLogs()) + require.Equal(t, 0, apiClient.NumberOfReportStageLogsFromLastCheckpoint()) + assert.Equal(t, 2, num) + + sp1.Complete(0) + + flushes, deletes = p.flush(context.TODO()) + require.Equal(t, 0, apiClient.NumberOfReportStageLogs()) + require.Equal(t, 0, apiClient.NumberOfReportStageLogsFromLastCheckpoint()) + assert.Equal(t, 1, flushes) + assert.Equal(t, 1, deletes) + + num = p.flushAll(context.TODO()) + require.Equal(t, 0, apiClient.NumberOfReportStageLogs()) + require.Equal(t, 0, apiClient.NumberOfReportStageLogsFromLastCheckpoint()) + assert.Equal(t, 1, num) +} diff --git a/pkg/plugin/sdk/logpersister/stagelogpersister.go b/pkg/plugin/sdk/logpersister/stagelogpersister.go new file mode 100644 index 0000000000..dc40c8c799 --- /dev/null +++ b/pkg/plugin/sdk/logpersister/stagelogpersister.go @@ -0,0 +1,209 @@ +// Copyright 2024 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logpersister + +import ( + "context" + "fmt" + "sync" + "time" + + "go.uber.org/atomic" + "go.uber.org/zap" + + "github.com/pipe-cd/pipecd/pkg/model" +) + +// stageLogPersister represents a log persister for a specific stage. +type stageLogPersister struct { + key key + blocks []*model.LogBlock + curLogIndex int64 + completed bool + completedAt time.Time + // Mutex to protect the fields above. + mu sync.RWMutex + + sentIndex int + checkpointSentTimestamp time.Time + done atomic.Bool + doneCh chan struct{} + + checkpointFlushInterval time.Duration + persister *persister + logger *zap.Logger +} + +// append appends a new log block. +func (sp *stageLogPersister) append(log string, s model.LogSeverity) { + now := time.Now() + + // We also send the error logs to the local logger. + if s == model.LogSeverity_ERROR { + sp.logger.Warn(fmt.Sprintf("STAGE ERROR LOG: %s", log)) + } + + sp.mu.Lock() + defer sp.mu.Unlock() + + sp.curLogIndex++ + sp.blocks = append(sp.blocks, &model.LogBlock{ + Index: sp.curLogIndex, + Log: log, + Severity: s, + CreatedAt: now.Unix(), + }) +} + +// Write appends a new INFO log block. +func (sp *stageLogPersister) Write(log []byte) (int, error) { + sp.Info(string(log)) + return len(log), nil +} + +// Info appends a new INFO log block. +func (sp *stageLogPersister) Info(log string) { + sp.append(log, model.LogSeverity_INFO) +} + +// Infof formats and appends a new INFO log block. +func (sp *stageLogPersister) Infof(format string, a ...interface{}) { + sp.append(fmt.Sprintf(format, a...), model.LogSeverity_INFO) +} + +// Success appends a new SUCCESS log block. +func (sp *stageLogPersister) Success(log string) { + sp.append(log, model.LogSeverity_SUCCESS) +} + +// Successf formats and appends a new SUCCESS log block. +func (sp *stageLogPersister) Successf(format string, a ...interface{}) { + sp.append(fmt.Sprintf(format, a...), model.LogSeverity_SUCCESS) +} + +// Error appends a new ERROR log block. +func (sp *stageLogPersister) Error(log string) { + sp.append(log, model.LogSeverity_ERROR) +} + +// Errorf formats and appends a new ERROR log block. +func (sp *stageLogPersister) Errorf(format string, a ...interface{}) { + sp.append(fmt.Sprintf(format, a...), model.LogSeverity_ERROR) +} + +// Complete marks the completion of logging for this stage. +// This means no more log for this stage will be added into this persister. +func (sp *stageLogPersister) Complete(timeout time.Duration) error { + sp.mu.Lock() + sp.completed = true + sp.completedAt = time.Now() + sp.mu.Unlock() + + timer := time.NewTimer(timeout) + defer timer.Stop() + + select { + case <-timer.C: + return fmt.Errorf("timed out") + + case <-sp.doneCh: + return nil + } +} + +func (sp *stageLogPersister) isStale(period time.Duration) bool { + if sp.done.Load() { + return true + } + + sp.mu.RLock() + defer sp.mu.RUnlock() + + if sp.completed && time.Since(sp.completedAt) > period { + return true + } + return false +} + +// flush sends the new log blocks or all of the blocks from the checkpoint +// based on the elapsed time. +// By design this flush function for a specific stageLogPersister will not be called concurrently. +func (sp *stageLogPersister) flush(ctx context.Context) error { + // Do nothing when this persister is already done. + if sp.done.Load() { + return nil + } + + sp.mu.RLock() + completed := sp.completed + sp.mu.RUnlock() + + if completed || time.Since(sp.checkpointSentTimestamp) > sp.checkpointFlushInterval { + sp.checkpointSentTimestamp = time.Now() + return sp.flushFromLastCheckpoint(ctx) + } + + return sp.flushNewLogs(ctx) +} + +func (sp *stageLogPersister) flushNewLogs(ctx context.Context) error { + sp.mu.RLock() + blocks := sp.blocks[sp.sentIndex:] + sp.mu.RUnlock() + + numBlocks := len(blocks) + if numBlocks == 0 { + return nil + } + + if err := sp.persister.reportStageLogs(ctx, sp.key, blocks); err != nil { + return err + } + + // Update sentIndex. + sp.sentIndex += numBlocks + return nil +} + +func (sp *stageLogPersister) flushFromLastCheckpoint(ctx context.Context) (err error) { + sp.mu.RLock() + blocks := sp.blocks + completed := sp.completed + sp.mu.RUnlock() + + defer func() { + if err == nil && completed { + sp.done.Store(true) + close(sp.doneCh) + } + }() + + numBlocks := len(blocks) + if numBlocks == 0 { + return nil + } + + if err := sp.persister.reportStageLogsFromLastCheckpoint(ctx, sp.key, blocks, completed); err != nil { + return err + } + + // Remove all sent blocks and update checkpointSentIndex. + sp.mu.Lock() + sp.blocks = sp.blocks[numBlocks:] + sp.mu.Unlock() + + sp.sentIndex = 0 + return nil +} diff --git a/pkg/plugin/sdk/pipedapi/client.go b/pkg/plugin/sdk/pipedapi/client.go new file mode 100644 index 0000000000..b326cd582b --- /dev/null +++ b/pkg/plugin/sdk/pipedapi/client.go @@ -0,0 +1,55 @@ +// Copyright 2024 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipedapi + +import ( + "context" + "slices" + + "google.golang.org/grpc" + + service "github.com/pipe-cd/pipecd/pkg/plugin/pipedservice" + "github.com/pipe-cd/pipecd/pkg/rpc/rpcclient" +) + +type PluginServiceClient struct { + service.PluginServiceClient + conn *grpc.ClientConn +} + +func NewClient(ctx context.Context, address string, opts ...rpcclient.DialOption) (*PluginServiceClient, error) { + // Clone the opts to avoid modifying the original opts slice. + opts = slices.Clone(opts) + + // Append the required options. + // The WithBlock option is required to make the client wait until the connection is up. + // The WithInsecure option is required to disable the transport security. + // The piped service does not require transport security because it is only used in localhost. + opts = append(opts, rpcclient.WithBlock(), rpcclient.WithInsecure()) + + conn, err := rpcclient.DialContext(ctx, address, opts...) + if err != nil { + return nil, err + } + + return &PluginServiceClient{ + PluginServiceClient: service.NewPluginServiceClient(conn), + conn: conn, + }, nil +} + +func (c *PluginServiceClient) Close() error { + return c.conn.Close() +} diff --git a/pkg/plugin/sdk/signalhandler/handler.go b/pkg/plugin/sdk/signalhandler/handler.go new file mode 100644 index 0000000000..3037e2a036 --- /dev/null +++ b/pkg/plugin/sdk/signalhandler/handler.go @@ -0,0 +1,47 @@ +// Copyright 2024 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package signalhandler + +import ( + "context" + "os" + "os/signal" + "syscall" + + "go.uber.org/atomic" +) + +var ( + terminated atomic.Bool + + signals = []os.Signal{syscall.SIGINT, syscall.SIGTERM} +) + +func init() { + // Listen for termination signals. + // When a termination signal is received, the signal handler will set the terminated flag to true. + ctx, cancel := signal.NotifyContext(context.Background(), signals...) + go func() { + defer cancel() + <-ctx.Done() + terminated.Store(true) + }() +} + +// Terminated returns true if the signal handler has received a termination signal. +// The termination signals are sent by the piped when it wants to stop running gracefully. +func Terminated() bool { + return terminated.Load() +} diff --git a/pkg/plugin/sdk/toolregistry/toolregistry.go b/pkg/plugin/sdk/toolregistry/toolregistry.go new file mode 100644 index 0000000000..d9cdb56ecc --- /dev/null +++ b/pkg/plugin/sdk/toolregistry/toolregistry.go @@ -0,0 +1,45 @@ +// Copyright 2024 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package toolregistry + +import ( + "context" + + service "github.com/pipe-cd/pipecd/pkg/plugin/pipedservice" +) + +type ToolRegistry struct { + client service.PluginServiceClient +} + +func NewToolRegistry(client service.PluginServiceClient) *ToolRegistry { + return &ToolRegistry{ + client: client, + } +} + +func (r *ToolRegistry) InstallTool(ctx context.Context, name, version, script string) (path string, err error) { + res, err := r.client.InstallTool(ctx, &service.InstallToolRequest{ + Name: name, + Version: version, + InstallScript: script, + }) + + if err != nil { + return "", err + } + + return res.GetInstalledPath(), nil +} diff --git a/pkg/plugin/sdk/toolregistry/toolregistrytest/toolregistrytest.go b/pkg/plugin/sdk/toolregistry/toolregistrytest/toolregistrytest.go new file mode 100644 index 0000000000..f6d3ffae51 --- /dev/null +++ b/pkg/plugin/sdk/toolregistry/toolregistrytest/toolregistrytest.go @@ -0,0 +1,110 @@ +// Copyright 2024 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package toolregistrytest + +import ( + "bytes" + "context" + "os" + "os/exec" + "runtime" + "testing" + "text/template" + + "google.golang.org/grpc" + + "github.com/pipe-cd/pipecd/pkg/plugin/pipedservice" + "github.com/pipe-cd/pipecd/pkg/plugin/toolregistry" +) + +type templateValues struct { + Name string + Version string + OutPath string + TmpDir string + Arch string + Os string +} + +type fakeClient struct { + pipedservice.PluginServiceClient + testingT *testing.T +} + +func (c *fakeClient) binDir() (string, error) { + target := c.testingT.TempDir() + "/bin" + if err := os.MkdirAll(target, 0o755); err != nil { + return "", err + } + return target, nil +} + +func (c *fakeClient) outPath() string { + return c.testingT.TempDir() + "/out" +} + +func (c *fakeClient) InstallTool(ctx context.Context, in *pipedservice.InstallToolRequest, opts ...grpc.CallOption) (*pipedservice.InstallToolResponse, error) { + outPath := c.outPath() + + binDir, err := c.binDir() + if err != nil { + return nil, err + } + + t, err := template.New("install script").Parse(in.GetInstallScript()) + if err != nil { + return nil, err + } + + vars := templateValues{ + Name: in.GetName(), + Version: in.GetVersion(), + OutPath: outPath, + TmpDir: c.testingT.TempDir(), + Arch: runtime.GOARCH, + Os: runtime.GOOS, + } + var buf bytes.Buffer + if err := t.Execute(&buf, vars); err != nil { + return nil, err + } + + cmd := exec.CommandContext(ctx, "/bin/sh", "-c", buf.String()) + if out, err := cmd.CombinedOutput(); err != nil { + c.testingT.Log(string(out)) + return nil, err + } + + if err := os.Chmod(outPath, 0o755); err != nil { + return nil, err + } + + target := binDir + "/" + in.GetName() + "-" + in.GetVersion() + if out, err := exec.CommandContext(ctx, "/bin/sh", "-c", "mv "+outPath+" "+target).CombinedOutput(); err != nil { + c.testingT.Log(string(out)) + return nil, err + } + + return &pipedservice.InstallToolResponse{ + InstalledPath: target, + }, nil +} + +// NewTestToolRegistry returns a new instance of ToolRegistry for testing purpose. +func NewTestToolRegistry(t *testing.T) *toolregistry.ToolRegistry { + return toolregistry.NewToolRegistry(&fakeClient{ + testingT: t, + }) +}