@@ -26,10 +26,10 @@ type Client interface {
26
26
matchers model.Matchers ,
27
27
selector labels.Selector ,
28
28
ignorePodOwnerTypes []string ,
29
- ) (model. ContainerListener , error )
29
+ ) (ContainerListener , error )
30
30
31
31
// CollectContainerDeltasForDuration collects container deltas from a listener for a given duration
32
- CollectContainerDeltasForDuration (listener model. ContainerListener , duration time.Duration ) (model.ContainerDeltaSet , error )
32
+ CollectContainerDeltasForDuration (listener ContainerListener , duration time.Duration ) (model.ContainerDeltaSet , error )
33
33
34
34
// GetContainerStatus returns the status of a container
35
35
GetContainerStatus (container model.Container ) (model.ContainerStatus , error )
@@ -50,13 +50,20 @@ func NewClient(ctx context.Context, clusterToClientset map[string]*kubernetes.Cl
50
50
}
51
51
}
52
52
53
+ type ContainerListener struct {
54
+ Cluster string
55
+ Namespace string
56
+ Stop func ()
57
+ containerDeltaChan chan model.ContainerDelta
58
+ }
59
+
53
60
func (c clientImpl ) GetContainerListener (
54
61
cluster ,
55
62
namespace string ,
56
63
matchers model.Matchers ,
57
64
selector labels.Selector ,
58
65
ignorePodOwnerTypes []string ,
59
- ) (model. ContainerListener , error ) {
66
+ ) (ContainerListener , error ) {
60
67
deltaChan := make (chan model.ContainerDelta , 100 )
61
68
stopChan := make (chan struct {})
62
69
@@ -113,7 +120,7 @@ func (c clientImpl) GetContainerListener(
113
120
},
114
121
})
115
122
if err != nil {
116
- return model. ContainerListener {}, fmt .Errorf ("error adding event handler: %v" , err )
123
+ return ContainerListener {}, fmt .Errorf ("error adding event handler: %v" , err )
117
124
}
118
125
119
126
go func () {
@@ -122,32 +129,32 @@ func (c clientImpl) GetContainerListener(
122
129
123
130
if ! cache .WaitForCacheSync (stopChan , podInformer .HasSynced ) {
124
131
close (stopChan )
125
- return model. ContainerListener {}, fmt .Errorf ("timed out waiting for caches to sync" )
132
+ return ContainerListener {}, fmt .Errorf ("timed out waiting for caches to sync" )
126
133
}
127
134
128
135
stop := func () {
129
136
close (stopChan )
130
137
close (deltaChan )
131
138
}
132
139
133
- return model. ContainerListener {
140
+ return ContainerListener {
134
141
Cluster : cluster ,
135
142
Namespace : namespace ,
136
- ContainerDeltaChan : deltaChan ,
143
+ containerDeltaChan : deltaChan ,
137
144
Stop : stop ,
138
145
}, nil
139
146
}
140
147
141
148
func (c clientImpl ) CollectContainerDeltasForDuration (
142
- listener model. ContainerListener ,
149
+ listener ContainerListener ,
143
150
duration time.Duration ,
144
151
) (model.ContainerDeltaSet , error ) {
145
152
var deltas model.ContainerDeltaSet
146
153
timeout := time .After (duration )
147
154
148
155
for {
149
156
select {
150
- case containerDelta , ok := <- listener .ContainerDeltaChan :
157
+ case containerDelta , ok := <- listener .containerDeltaChan :
151
158
if ! ok {
152
159
return model.ContainerDeltaSet {}, fmt .Errorf ("add/update pod channel closed" )
153
160
}
0 commit comments