Skip to content
This repository was archived by the owner on Mar 2, 2022. It is now read-only.

Fix/dropped errors #27

Merged
merged 5 commits into from
Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [Unreleased]
### Fixed
- Make waiting for locks more stable
- Don't drop k8sExe stderr output ([#27])

## [v0.2.0] 2020-05-29
### Changed
Expand Down Expand Up @@ -138,3 +139,5 @@ compatibility with older operator versions. Changes to the design contain:
[v0.0.3]: https://git.vshn.net/vshn/wrestic/compare/v0.0.2...v0.0.3
[v0.0.2]: https://git.vshn.net/vshn/wrestic/compare/v0.0.1...v0.0.2
[v0.0.1]: https://git.vshn.net/vshn/wrestic/tree/v0.0.1

[#27]: https://github.com/vshn/wrestic/pull/27
5 changes: 3 additions & 2 deletions cmd/wrestic/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"os"
Expand Down Expand Up @@ -161,9 +162,9 @@ func run(resticCLI *restic.Restic, mainLogger logr.Logger) error {

if err == nil {
for _, pod := range podList {
data, stdErr, err := kubernetes.PodExec(pod, mainLogger)
data, err := kubernetes.PodExec(pod, mainLogger)
if err != nil {
mainLogger.Error(fmt.Errorf("error occured during data stream from k8s"), stdErr.String())
mainLogger.Error(errors.New("error occured during data stream from k8s"), "pod execution was interrupted")
return err
}
filename := fmt.Sprintf("/%s-%s", os.Getenv(restic.Hostname), pod.ContainerName)
Expand Down
17 changes: 8 additions & 9 deletions kubernetes/pod_exec.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package kubernetes

import (
"bytes"
"fmt"
"io"
"strings"

"github.com/firepear/qsplit"
"github.com/go-logr/logr"
"github.com/vshn/wrestic/logging"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
Expand All @@ -21,14 +21,14 @@ type ExecData struct {

// PodExec sends the command to the specified pod
// and returns a bytes buffer with the stdout
func PodExec(pod BackupPod, log logr.Logger) (*ExecData, *bytes.Buffer, error) {
func PodExec(pod BackupPod, log logr.Logger) (*ExecData, error) {

execLogger := log.WithName("k8sExec")

config, _ := getClientConfig()
k8sclient, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, nil, fmt.Errorf("can't create k8s for exec: %v", err)
return nil, fmt.Errorf("can't create k8s for exec: %w", err)
}

req := k8sclient.CoreV1().RESTClient().Post().
Expand All @@ -38,7 +38,7 @@ func PodExec(pod BackupPod, log logr.Logger) (*ExecData, *bytes.Buffer, error) {
SubResource("exec")
scheme := runtime.NewScheme()
if err := apiv1.AddToScheme(scheme); err != nil {
return nil, nil, fmt.Errorf("can't add runtime scheme: %v", err)
return nil, fmt.Errorf("can't add runtime scheme: %w", err)
}

command := qsplit.ToStrings([]byte(pod.Command))
Expand All @@ -56,25 +56,24 @@ func PodExec(pod BackupPod, log logr.Logger) (*ExecData, *bytes.Buffer, error) {

exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if err != nil {
return nil, nil, err
return nil, err
}

var stderr bytes.Buffer
var stdoutReader, stdoutWriter = io.Pipe()
done := make(chan bool, 1)
go func() {
err = exec.Stream(remotecommand.StreamOptions{
Stdin: nil,
Stdout: stdoutWriter,
Stderr: &stderr,
Stderr: logging.NewErrorWriter(log.WithName(pod.PodName)),
Tty: false,
})

defer stdoutWriter.Close()
done <- true

if err != nil {
execLogger.Error(err, "error ocurred stream backup data", "namespace", pod.Namespace, "pod", pod.PodName)
execLogger.Error(err, "streaming data failed", "namespace", pod.Namespace, "pod", pod.PodName)
return
}
}()
Expand All @@ -84,5 +83,5 @@ func PodExec(pod BackupPod, log logr.Logger) (*ExecData, *bytes.Buffer, error) {
Reader: stdoutReader,
}

return data, &stderr, nil
return data, nil
}
162 changes: 162 additions & 0 deletions logging/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package logging

import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
"time"

"github.com/go-logr/logr"
)

type BackupSummary struct {
MessageType string `json:"message_type"`
FilesNew int `json:"files_new"`
FilesChanged int `json:"files_changed"`
FilesUnmodified int `json:"files_unmodified"`
DirsNew int `json:"dirs_new"`
DirsChanged int `json:"dirs_changed"`
DirsUnmodified int `json:"dirs_unmodified"`
DataBlobs int `json:"data_blobs"`
TreeBlobs int `json:"tree_blobs"`
DataAdded int64 `json:"data_added"`
TotalFilesProcessed int `json:"total_files_processed"`
TotalBytesProcessed int `json:"total_bytes_processed"`
TotalDuration float64 `json:"total_duration"`
SnapshotID string `json:"snapshot_id"`
}

type BackupEnvelope struct {
MessageType string `json:"message_type,omitempty"`
BackupStatus
BackupSummary
BackupError
}

type BackupStatus struct {
PercentDone float64 `json:"percent_done"`
TotalFiles int `json:"total_files"`
FilesDone int `json:"files_done"`
TotalBytes int `json:"total_bytes"`
BytesDone int `json:"bytes_done"`
CurrentFiles []string `json:"current_files"`
ErrorCount int `json:"error_count"`
}

// SummaryFunc takes the summed up status of the backup and will process this further like
// logging, metrics and webhooks.
type SummaryFunc func(summary BackupSummary, errorCount int, folder string, startTimestamp, endTimestamp int64)

type BackupOutputParser struct {
log logr.Logger
errorCount int
lineCounter int
summaryfunc SummaryFunc
folder string
}

type BackupError struct {
Error struct {
Op string `json:"Op"`
Path string `json:"Path"`
Err int `json:"Err"`
} `json:"error"`
During string `json:"during"`
Item string `json:"item"`
}

type outFunc func(string) error

// New creates a writer which directly writes to the given logger function.
func New(out outFunc) io.Writer {
return &writer{out}
}

// NewInfoWriter creates a writer which directly writes to the given logger using info level.
// It ensures that each line is handled seperately. This avoids mangled lines when parsing
// JSON outputs.
func NewInfoWriter(l logr.InfoLogger) io.Writer {
return New((&LogInfoPrinter{l}).out)
}

// NewInfoWriter creates a writer which directly writes to the given logger using error level.
// It ensures that each line is handled seperately. This avoids mangled lines when parsing
// JSON outputs.
func NewErrorWriter(l logr.Logger) io.Writer {
return New((&LogErrPrinter{l}).out)
}

type writer struct {
out outFunc
}

func (w writer) Write(p []byte) (int, error) {

scanner := bufio.NewScanner(bytes.NewReader(p))

for scanner.Scan() {
err := w.out(scanner.Text())
if err != nil {
return len(p), err
}
}

return len(p), nil
}

type LogInfoPrinter struct {
log logr.InfoLogger
}

func (l *LogInfoPrinter) out(s string) error {
l.log.Info(s)
return nil
}

type LogErrPrinter struct {
Log logr.Logger
}

func (l *LogErrPrinter) out(s string) error {
l.Log.Error(fmt.Errorf("error during command"), s)
return nil
}

func NewBackupOutputParser(logger logr.Logger, folderName string, summaryfunc SummaryFunc) io.Writer {
bop := &BackupOutputParser{
log: logger,
folder: folderName,
summaryfunc: summaryfunc,
}
return New(bop.out)
}

func (b *BackupOutputParser) out(s string) error {
envelope := &BackupEnvelope{}

err := json.Unmarshal([]byte(s), envelope)
if err != nil {
b.log.Error(err, "can't decode restic json output", "string", s)
return err
}

switch envelope.MessageType {
case "error":
b.errorCount++
b.log.Error(fmt.Errorf("error occurred during backup"), envelope.Item+" during "+envelope.During+" "+envelope.Error.Op)
case "status":
// Restic does the json output with 60hz, which is a bit much...
if b.lineCounter%60 == 0 {
percent := envelope.PercentDone * 100
b.log.Info("progress of backup", "percentage", fmt.Sprintf("%.2f%%", percent))
}
b.lineCounter++
case "summary":
b.log.Info("backup finished", "new files", envelope.FilesNew, "changed files", envelope.FilesChanged, "errors", b.errorCount)
b.log.Info("stats", "time", envelope.TotalDuration, "bytes added", envelope.DataAdded, "bytes processed", envelope.TotalBytesProcessed)
b.summaryfunc(envelope.BackupSummary, b.errorCount, b.folder, 1, time.Now().Unix())
}
return nil
}
55 changes: 3 additions & 52 deletions restic/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,52 +10,9 @@ import (

"github.com/go-logr/logr"
"github.com/vshn/wrestic/kubernetes"
"github.com/vshn/wrestic/logging"
)

type backupEnvelope struct {
MessageType string `json:"message_type,omitempty"`
backupStatus
backupSummary
backupError
}

type backupStatus struct {
PercentDone float64 `json:"percent_done"`
TotalFiles int `json:"total_files"`
FilesDone int `json:"files_done"`
TotalBytes int `json:"total_bytes"`
BytesDone int `json:"bytes_done"`
CurrentFiles []string `json:"current_files"`
ErrorCount int `json:"error_count"`
}

type backupSummary struct {
MessageType string `json:"message_type"`
FilesNew int `json:"files_new"`
FilesChanged int `json:"files_changed"`
FilesUnmodified int `json:"files_unmodified"`
DirsNew int `json:"dirs_new"`
DirsChanged int `json:"dirs_changed"`
DirsUnmodified int `json:"dirs_unmodified"`
DataBlobs int `json:"data_blobs"`
TreeBlobs int `json:"tree_blobs"`
DataAdded int64 `json:"data_added"`
TotalFilesProcessed int `json:"total_files_processed"`
TotalBytesProcessed int `json:"total_bytes_processed"`
TotalDuration float64 `json:"total_duration"`
SnapshotID string `json:"snapshot_id"`
}

type backupError struct {
Error struct {
Op string `json:"Op"`
Path string `json:"Path"`
Err int `json:"Err"`
} `json:"error"`
During string `json:"during"`
Item string `json:"item"`
}

// Backup backup to the repository. It will loop through all subfolders of
// backupdir and trigger a snapshot for each of them.
func (r *Restic) Backup(backupDir string, tags ArrayOpts) error {
Expand Down Expand Up @@ -116,17 +73,11 @@ func (r *Restic) newParseBackupOutput(log logr.Logger, folder string) io.Writer

progressLogger := log.WithName("progress")

return &outputWrapper{
parser: &backupOutputParser{
folder: folder,
log: progressLogger,
summaryfunc: r.sendBackupStats,
},
}
return logging.NewBackupOutputParser(progressLogger, folder, r.sendBackupStats)

}

func (r *Restic) sendBackupStats(summary backupSummary, errorCount int, folder string, startTimestamp, endTimestamp int64) {
func (r *Restic) sendBackupStats(summary logging.BackupSummary, errorCount int, folder string, startTimestamp, endTimestamp int64) {

metrics := r.parseSummary(summary, errorCount, folder, 1, time.Now().Unix())

Expand Down
14 changes: 4 additions & 10 deletions restic/check.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package restic

import "github.com/vshn/wrestic/logging"

// Check will check the repository for errors
func (r *Restic) Check() error {
checklogger := r.logger.WithName("check")
Expand All @@ -11,16 +13,8 @@ func (r *Restic) Check() error {
Args: []string{
"check",
},
StdOut: &outputWrapper{
parser: &logOutParser{
log: checklogger.WithName("restic"),
},
},
StdErr: &outputWrapper{
parser: &logErrParser{
log: checklogger.WithName("restic"),
},
},
StdOut: logging.NewInfoWriter(checklogger.WithName("restic")),
StdErr: logging.NewErrorWriter(checklogger.WithName("restic")),
}

cmd := NewCommand(r.ctx, checklogger, opts)
Expand Down
Loading