@@ -13,7 +13,6 @@ import (
13
13
14
14
"github.com/spf13/afero"
15
15
16
- "github.com/bpineau/katafygio/config"
17
16
"github.com/bpineau/katafygio/pkg/event"
18
17
)
19
18
22
21
crc64Table = crc64 .MakeTable (crc64 .ECMA )
23
22
)
24
23
24
+ type logger interface {
25
+ Infof (format string , args ... interface {})
26
+ Errorf (format string , args ... interface {})
27
+ }
28
+
25
29
// activeFiles will contain a list of active (present in cluster) objets; we'll
26
30
// use that to periodically find and garbage collect stale objets in the git repos
27
31
// (ie. if some objects were delete from cluster while katafygio was not running),
@@ -30,36 +34,42 @@ type activeFiles map[string]uint64
30
34
31
35
// Listener receive events from controllers and save them to disk as yaml files
32
36
type Listener struct {
33
- config * config. KfConfig
37
+ logger logger
34
38
events event.Notifier
35
39
actives activeFiles
36
40
activesLock sync.RWMutex
41
+ localDir string
42
+ gcInterval time.Duration
43
+ dryRun bool
37
44
stopch chan struct {}
38
45
donech chan struct {}
39
46
}
40
47
41
48
// New creates a new event Listener
42
- func New (config * config. KfConfig , events event.Notifier ) * Listener {
49
+ func New (log logger , events event.Notifier , localDir string , gcInterval int , dryRun bool ) * Listener {
43
50
return & Listener {
44
- config : config ,
45
- events : events ,
46
- actives : activeFiles {},
47
- stopch : make (chan struct {}),
48
- donech : make (chan struct {}),
51
+ logger : log ,
52
+ events : events ,
53
+ actives : activeFiles {},
54
+ localDir : localDir ,
55
+ dryRun : dryRun ,
56
+ gcInterval : time .Duration (gcInterval ) * time .Second ,
57
+ stopch : make (chan struct {}),
58
+ donech : make (chan struct {}),
49
59
}
50
60
}
51
61
52
62
// Start continuously receive events and saves them to disk as files
53
63
func (w * Listener ) Start () * Listener {
54
- w .config . Logger . Info ("Starting event recorder" )
55
- err := appFs .MkdirAll (filepath .Clean (w .config . LocalDir ), 0700 )
64
+ w .logger . Infof ("Starting event recorder" )
65
+ err := appFs .MkdirAll (filepath .Clean (w .localDir ), 0700 )
56
66
if err != nil {
57
- panic (fmt .Sprintf ("Can't create directory %s: %v" , w .config . LocalDir , err ))
67
+ panic (fmt .Sprintf ("Can't create directory %s: %v" , w .localDir , err ))
58
68
}
59
69
60
70
go func () {
61
71
evCh := w .events .ReadChan ()
62
- gcTick := time .NewTicker (w .config . ResyncIntv * 2 )
72
+ gcTick := time .NewTicker (w .gcInterval )
63
73
defer gcTick .Stop ()
64
74
defer close (w .donech )
65
75
@@ -80,15 +90,15 @@ func (w *Listener) Start() *Listener {
80
90
81
91
// Stop halts the recorder service
82
92
func (w * Listener ) Stop () {
83
- w .config . Logger . Info ("Stopping event recorder" )
93
+ w .logger . Infof ("Stopping event recorder" )
84
94
close (w .stopch )
85
95
<- w .donech
86
96
}
87
97
88
98
func (w * Listener ) processNextEvent (ev * event.Notification ) {
89
- path , err := getPath (w .config . LocalDir , ev )
99
+ path , err := getPath (w .localDir , ev )
90
100
if err != nil {
91
- w .config . Logger .Errorf ("failed to get %s path: %v" , ev .Key , err )
101
+ w .logger .Errorf ("failed to get %s path: %v" , ev .Key , err )
92
102
}
93
103
94
104
switch ev .Action {
@@ -99,7 +109,7 @@ func (w *Listener) processNextEvent(ev *event.Notification) {
99
109
}
100
110
101
111
if err != nil {
102
- w .config . Logger .Errorf ("failed to delete or save %s: %v" , ev .Key , err )
112
+ w .logger .Errorf ("failed to delete or save %s: %v" , ev .Key , err )
103
113
}
104
114
}
105
115
@@ -115,8 +125,7 @@ func getPath(root string, ev *event.Notification) (string, error) {
115
125
}
116
126
117
127
func (w * Listener ) remove (file string ) error {
118
- w .config .Logger .Debugf ("Removing %s from disk" , file )
119
- if w .config .DryRun {
128
+ if w .dryRun {
120
129
return nil
121
130
}
122
131
@@ -127,14 +136,12 @@ func (w *Listener) remove(file string) error {
127
136
}
128
137
129
138
func (w * Listener ) relativePath (file string ) string {
130
- root := filepath .Clean (w .config . LocalDir )
139
+ root := filepath .Clean (w .localDir )
131
140
return strings .Replace (file , root + "/" , "" , 1 )
132
141
}
133
142
134
143
func (w * Listener ) save (file string , data []byte ) error {
135
- w .config .Logger .Debugf ("Saving %s to disk" , file )
136
-
137
- if w .config .DryRun {
144
+ if w .dryRun {
138
145
return nil
139
146
}
140
147
@@ -182,7 +189,7 @@ func (w *Listener) save(file string, data []byte) error {
182
189
func (w * Listener ) deleteObsoleteFiles () {
183
190
w .activesLock .RLock ()
184
191
defer w .activesLock .RUnlock ()
185
- root := filepath .Clean (w .config . LocalDir )
192
+ root := filepath .Clean (w .localDir )
186
193
187
194
err := afero .Walk (appFs , root , func (path string , info os.FileInfo , err error ) error {
188
195
if info .IsDir () {
@@ -198,15 +205,14 @@ func (w *Listener) deleteObsoleteFiles() {
198
205
return nil
199
206
}
200
207
201
- w .config .Logger .Debugf ("Removing %s from disk" , path )
202
- if ! w .config .DryRun {
208
+ if ! w .dryRun {
203
209
return appFs .Remove (filepath .Clean (path ))
204
210
}
205
211
206
212
return nil
207
213
})
208
214
209
215
if err != nil {
210
- w .config . Logger . Warnf ("failed to gc some files: %v" , err )
216
+ w .logger . Errorf ("failed to gc some files: %v" , err )
211
217
}
212
218
}
0 commit comments