Skip to content

Commit 2a5fd89

Browse files
committed
chore: Shift max queue size to general as it is global not receiver specific and remove redundant pending counts for non-ack connections
1 parent 29c5745 commit 2a5fd89

File tree

5 files changed

+23
-17
lines changed

5 files changed

+23
-17
lines changed

lc-lib/config/general.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@ import (
3030
var registeredGeneralCreators = make(map[string]SectionCreator)
3131

3232
const (
33-
defaultGeneralHost string = "localhost.localdomain"
34-
defaultGeneralLogLevel logging.Level = logging.INFO
35-
defaultGeneralLogStdout bool = true
36-
defaultGeneralLogSyslog bool = false
33+
defaultGeneralHost string = "localhost.localdomain"
34+
defaultGeneralLogLevel logging.Level = logging.INFO
35+
defaultGeneralLogStdout bool = true
36+
defaultGeneralLogSyslog bool = false
37+
defaultGeneralMaxQueueSize int64 = 128 * 1024 * 1024 // 128 MiB
3738
)
3839

3940
// General holds the general configuration
@@ -44,6 +45,7 @@ type General struct {
4445
LogLevel logging.Level `config:"log level"`
4546
LogStdout bool `config:"log stdout"`
4647
LogSyslog bool `config:"log syslog"`
48+
MaxQueueSize int64 `config:"max queue size"`
4749

4850
Custom map[string]interface{} `config:",embed_dynamic"`
4951
}
@@ -95,10 +97,11 @@ func RegisterGeneral(name string, creator SectionCreator) {
9597
func init() {
9698
RegisterSection("general", func() interface{} {
9799
c := &General{
98-
LogLevel: defaultGeneralLogLevel,
99-
LogStdout: defaultGeneralLogStdout,
100-
LogSyslog: defaultGeneralLogSyslog,
101-
Custom: make(map[string]interface{}),
100+
LogLevel: defaultGeneralLogLevel,
101+
LogStdout: defaultGeneralLogStdout,
102+
LogSyslog: defaultGeneralLogSyslog,
103+
MaxQueueSize: defaultGeneralMaxQueueSize,
104+
Custom: make(map[string]interface{}),
102105
}
103106

104107
for k, creator := range registeredGeneralCreators {

lc-lib/receiver/pool.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ type Pool struct {
6161
spool []*spoolEntry
6262
spoolSize int64
6363

64+
generalConfig *config.General
6465
apiConfig *admin.Config
6566
apiConnections api.Array
6667
apiListeners api.Array
@@ -72,6 +73,7 @@ type Pool struct {
7273
func NewPool(app *core.App) *Pool {
7374
return &Pool{
7475
apiConfig: admin.FetchConfig(app.Config()),
76+
generalConfig: app.Config().General(),
7577
ackChan: make(chan []*event.Event),
7678
eventChan: make(chan transports.Event),
7779
scheduler: scheduler.NewScheduler(),
@@ -184,7 +186,7 @@ ReceiverLoop:
184186
receiver := eventImpl.Context().Value(transports.ContextReceiver).(transports.Receiver)
185187
r.startIdleTimeout(eventImpl.Context(), receiver, connection)
186188
r.connectionLock.Lock()
187-
connectionStatus := newPoolConnectionStatus(r, r.receivers[receiver].config.Name, r.receivers[receiver].listen, eventImpl.Remote(), eventImpl.Desc())
189+
connectionStatus := newPoolConnectionStatus(r, r.receivers[receiver].config.Name, r.receivers[receiver].listen, eventImpl.Remote(), eventImpl.Desc(), receiver.SupportsAck())
188190
r.connectionStatus[connection] = connectionStatus
189191
r.connectionLock.Unlock()
190192
// ReplaceEntry outside of connection lock, as entry updates that hold the entry lock will take the connection lock
@@ -196,7 +198,7 @@ ReceiverLoop:
196198
connection := eventImpl.Context().Value(transports.ContextConnection)
197199
receiver := eventImpl.Context().Value(transports.ContextReceiver).(transports.Receiver)
198200
connectionStatus := r.connectionStatus[connection]
199-
if r.spoolSize+int64(size) > r.receivers[receiver].config.MaxQueueSize {
201+
if r.spoolSize+int64(size) > r.generalConfig.MaxQueueSize {
200202
receiver.ShutdownConnectionRead(eventImpl.Context(), fmt.Errorf("max queue size exceeded"))
201203
r.connectionLock.Unlock()
202204
break
@@ -418,8 +420,10 @@ func (r *Pool) updateReceivers(newConfig *config.Config) {
418420
newReceivers[newReceiversByListen[listen]] = &poolReceiverStatus{config: cfgEntry, listen: listen, active: true}
419421
receiverApi := &api.KeyValue{}
420422
receiverApi.SetEntry("listen", api.String(listen))
421-
receiverApi.SetEntry("maxPendingPayloads", api.Number(cfgEntry.MaxPendingPayloads))
422-
receiverApi.SetEntry("maxQueueSize", api.Number(cfgEntry.MaxQueueSize))
423+
if newReceiversByListen[listen].SupportsAck() {
424+
// Only applies if supports ack
425+
receiverApi.SetEntry("maxPendingPayloads", api.Number(cfgEntry.MaxPendingPayloads))
426+
}
423427
r.apiListeners.AddEntry(listen, receiverApi)
424428
}
425429
}

lc-lib/receiver/status.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,12 @@ type poolConnectionStatus struct {
4141
progress []*poolEventProgress
4242
lines int64
4343
bytes int
44+
supportsAck bool
4445

4546
api.KeyValue
4647
}
4748

48-
func newPoolConnectionStatus(p *Pool, name string, listener string, remote string, desc string) *poolConnectionStatus {
49+
func newPoolConnectionStatus(p *Pool, name string, listener string, remote string, desc string, supportsAck bool) *poolConnectionStatus {
4950
return &poolConnectionStatus{
5051
p: p,
5152
name: name,
@@ -58,7 +59,8 @@ func newPoolConnectionStatus(p *Pool, name string, listener string, remote strin
5859
"remote": remote,
5960
"desc": desc,
6061
},
61-
progress: make([]*poolEventProgress, 0),
62+
progress: make([]*poolEventProgress, 0),
63+
supportsAck: supportsAck,
6264
}
6365
}
6466

lc-lib/transports/common.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ const (
3333
defaultNetworkBackoff time.Duration = 5 * time.Second
3434
defaultNetworkBackoffMax time.Duration = 300 * time.Second
3535
defaultNetworkMaxPendingPayloads int64 = 10
36-
defaultNetworkMaxQueueSize int64 = 128 * 1024 * 1024 // 128 MiB
3736
defaultNetworkMethod string = "random"
3837
defaultNetworkRfc2782Service string = "courier"
3938
defaultNetworkRfc2782Srv bool = true

lc-lib/transports/configreceiver.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ type ReceiverConfigEntry struct {
3838
Transport string `config:"transport"`
3939
Listen []string `config:"listen"`
4040
MaxPendingPayloads int64 `config:"max pending payloads"`
41-
MaxQueueSize int64 `config:"max queue size"`
4241

4342
Unused map[string]interface{} `json:",omitempty"`
4443
}
@@ -48,7 +47,6 @@ func (c *ReceiverConfigEntry) Defaults() {
4847
c.Enabled = true
4948
c.Transport = defaultReceiverTransport
5049
c.MaxPendingPayloads = defaultNetworkMaxPendingPayloads
51-
c.MaxQueueSize = defaultNetworkMaxQueueSize
5250
}
5351

5452
// Init the receiver configuration

0 commit comments

Comments
 (0)