@@ -37,7 +37,7 @@ type CommandExecutor[C any, R any] interface {
37
37
type ExecutionController interface {
38
38
DrainAndBacklog () Signaler
39
39
Resume ()
40
- ShutdownSignaler () Signaler
40
+ WaitForShutdown () <- chan struct {}
41
41
}
42
42
43
43
type executorCoordinator []ExecutionController
@@ -65,35 +65,31 @@ func (coordinator executorCoordinator) Resume() {
65
65
}
66
66
}
67
67
68
- func (coordinator executorCoordinator ) ShutdownSignaler () Signaler {
69
- var signalers []Signaler
70
- for _ , executor := range coordinator {
71
- signalers = append (signalers , executor .ShutdownSignaler ())
72
- }
73
-
74
- signal := NewSignaler ()
68
+ func (coordinator executorCoordinator ) WaitForShutdown () <- chan struct {} {
69
+ signal := make (chan struct {})
75
70
go func () {
76
- defer signal . Send ( )
77
- for _ , signaler := range signalers {
78
- <- signaler . Receive ()
71
+ defer close ( signal )
72
+ for _ , executor := range coordinator {
73
+ <- executor . WaitForShutdown ()
79
74
}
80
75
}()
81
76
82
77
return signal
83
78
}
84
79
85
80
type commandExecutor [C any , R any ] struct {
86
- command func (context.Context , C ) (R , error )
87
- drainAndBacklogSig chan Signaler
88
- resumeBackloggedSig Signaler
89
- cmdChan chan Command [C , R ]
81
+ command func (context.Context , C ) (R , error )
82
+ cmdChan chan Command [C , R ]
90
83
// backlogChan contains all the commands that failed with EOF, waiting to be retried.
91
84
backlogChan chan Command [C , R ]
92
85
// inflightCmds keeps track of the number of commands that are currently being executed.
93
- inflightCmds sync.WaitGroup
94
- executeSig Signaler
95
- shutdownCompleteSig Signaler
96
- errBuff ErrorBuffer
86
+ inflightCmds sync.WaitGroup
87
+
88
+ resumeBackloggedSig Signaler
89
+ drainAndBacklogSig chan Signaler
90
+ shutdownCompleteSig chan struct {}
91
+
92
+ errBuff ErrorBuffer
97
93
98
94
backLogCommands bool
99
95
backlog []Command [C , R ]
@@ -119,9 +115,7 @@ func NewCommandExecutor[C any, R any](ctx context.Context, errBuff ErrorBuffer,
119
115
backlogChan : make (chan Command [C , R ], 100 ),
120
116
drainAndBacklogSig : make (chan Signaler , 100 ),
121
117
resumeBackloggedSig : NewSignaler (),
122
-
123
- executeSig : NewSignaler (),
124
- shutdownCompleteSig : NewSignaler (),
118
+ shutdownCompleteSig : make (chan struct {}),
125
119
}
126
120
127
121
go executor .loop (ctx )
@@ -135,7 +129,7 @@ func (executor *commandExecutor[C, R]) loop(shutdownCtx context.Context) {
135
129
// doesn't wait on the channel provided.
136
130
defer func () {
137
131
defer stopCommands ()
138
- defer executor .shutdownCompleteSig . Close ( )
132
+ defer close ( executor .shutdownCompleteSig )
139
133
140
134
// close the cmdChan in case anything tries to write to it. This will ensure a panic occurs while trying to
141
135
// clean up any outstanding cmd.
@@ -150,13 +144,16 @@ func (executor *commandExecutor[C, R]) loop(shutdownCtx context.Context) {
150
144
executor .drainBacklogChannel ()
151
145
executor .backlog = append (executor .backlog , ReadAll (executor .cmdChan )... )
152
146
153
- logrus .Debug ("Returning errors for outstanding requests due to shutdown." )
154
- for _ , cmd := range executor .backlog {
155
- cmd .ReturnError (context .Canceled )
147
+ if len (executor .backlog ) > 0 {
148
+ logrus .Debug ("Returning errors for outstanding commands due to shutdown..." )
149
+ for _ , cmd := range executor .backlog {
150
+ cmd .ReturnError (context .Canceled )
151
+ }
152
+ logrus .Debug ("Finished returning errors for outstanding commands." )
153
+ } else {
154
+ logrus .Debug ("No outstanding commands, shutting down.." )
156
155
}
157
- logrus .Debug ("Finished returning errors for outstanding requests due to shutdown." )
158
156
159
- executor .executeSig .Close ()
160
157
close (executor .drainAndBacklogSig )
161
158
executor .resumeBackloggedSig .Close ()
162
159
}()
@@ -179,6 +176,9 @@ func (executor *commandExecutor[C, R]) loop(shutdownCtx context.Context) {
179
176
}
180
177
case cmd := <- executor .backlogChan :
181
178
logrus .Debugf ("Received backlog command (current backlog size: %d)." , len (executor .backlog ))
179
+ if len (executor .backlog ) > 50 {
180
+ logrus .Warningf ("Backlog size exceeded has exceed 50." )
181
+ }
182
182
executor .backlog = append (executor .backlog , cmd )
183
183
case signal := <- executor .drainAndBacklogSig :
184
184
logrus .Debugf ("Received requeue signal." )
@@ -250,6 +250,7 @@ func (executor *commandExecutor[C, R]) executeCommand(ctx context.Context, req C
250
250
defer executor .inflightCmds .Done ()
251
251
result , err := executor .command (ctx , req .Get ())
252
252
if err != nil {
253
+ logrus .Debugf ("Error executing command: %v" , err )
253
254
executor .errBuff .Write (err )
254
255
if errors .Is (err , io .EOF ) || errors .Is (err , context .Canceled ) {
255
256
executor .backlogChan <- req
@@ -287,6 +288,6 @@ func (executor *commandExecutor[Req, Resp]) Resume() {
287
288
executor .resumeBackloggedSig .Send ()
288
289
}
289
290
290
- func (executor * commandExecutor [C , R ]) ShutdownSignaler () Signaler {
291
+ func (executor * commandExecutor [C , R ]) WaitForShutdown () <- chan struct {} {
291
292
return executor .shutdownCompleteSig
292
293
}
0 commit comments