forked from k3s-io/kine
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtx.go
128 lines (110 loc) · 3.13 KB
/
tx.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package generic
import (
"context"
"database/sql"
"time"
"github.com/k3s-io/kine/pkg/metrics"
"github.com/k3s-io/kine/pkg/server"
"github.com/k3s-io/kine/pkg/util"
"github.com/sirupsen/logrus"
)
// explicit interface check
var _ server.Transaction = (*Tx)(nil)
type Tx struct {
x *sql.Tx
d *Generic
}
func (d *Generic) BeginTx(ctx context.Context, opts *sql.TxOptions) (server.Transaction, error) {
logrus.Tracef("TX BEGIN")
x, err := d.DB.BeginTx(ctx, opts)
if err != nil {
return nil, err
}
return &Tx{
x: x,
d: d,
}, nil
}
func (t *Tx) Commit() error {
logrus.Tracef("TX COMMIT")
return t.x.Commit()
}
func (t *Tx) MustCommit() {
if err := t.Commit(); err != nil {
logrus.Fatalf("Transaction commit failed: %v", err)
}
}
func (t *Tx) Rollback() error {
logrus.Tracef("TX ROLLBACK")
return t.x.Rollback()
}
func (t *Tx) MustRollback() {
if err := t.Rollback(); err != nil {
if err != sql.ErrTxDone {
logrus.Fatalf("Transaction rollback failed: %v", err)
}
}
}
func (t *Tx) GetCompactRevision(ctx context.Context) (int64, error) {
var id int64
row := t.queryRow(ctx, compactRevSQL)
err := row.Scan(&id)
if err == sql.ErrNoRows {
return 0, nil
}
return id, err
}
func (t *Tx) SetCompactRevision(ctx context.Context, revision int64) error {
logrus.Tracef("TX SETCOMPACTREVISION %v", revision)
_, err := t.execute(ctx, t.d.UpdateCompactSQL, revision)
return err
}
func (t *Tx) Compact(ctx context.Context, revision int64) (int64, error) {
logrus.Tracef("TX COMPACT %v", revision)
res, err := t.execute(ctx, t.d.CompactSQL, revision, revision)
if err != nil {
return 0, err
}
return res.RowsAffected()
}
func (t *Tx) GetRevision(ctx context.Context, revision int64) (*sql.Rows, error) {
return t.query(ctx, t.d.GetRevisionSQL, revision)
}
func (t *Tx) DeleteRevision(ctx context.Context, revision int64) error {
logrus.Tracef("TX DELETEREVISION %v", revision)
_, err := t.execute(ctx, t.d.DeleteSQL, revision)
return err
}
func (t *Tx) CurrentRevision(ctx context.Context) (int64, error) {
var id int64
row := t.queryRow(ctx, revSQL)
err := row.Scan(&id)
if err == sql.ErrNoRows {
return 0, nil
}
return id, err
}
func (t *Tx) query(ctx context.Context, sql string, args ...interface{}) (result *sql.Rows, err error) {
logrus.Tracef("TX QUERY %v : %s", args, util.Stripped(sql))
startTime := time.Now()
defer func() {
metrics.ObserveSQL(startTime, t.d.ErrCode(err), util.Stripped(sql), args)
}()
return t.x.QueryContext(ctx, sql, args...)
}
func (t *Tx) queryRow(ctx context.Context, sql string, args ...interface{}) (result *sql.Row) {
logrus.Tracef("TX QUERY ROW %v : %s", args, util.Stripped(sql))
startTime := time.Now()
defer func() {
metrics.ObserveSQL(startTime, t.d.ErrCode(result.Err()), util.Stripped(sql), args)
}()
return t.x.QueryRowContext(ctx, sql, args...)
}
func (t *Tx) execute(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error) {
logrus.Tracef("TX EXEC %v : %s", args, util.Stripped(sql))
startTime := time.Now()
defer func() {
metrics.ObserveSQL(startTime, t.d.ErrCode(err), util.Stripped(sql), args)
}()
return t.x.ExecContext(ctx, sql, args...)
}