Skip to content

Commit 29c5745

Browse files
committed
fix: Resolve pool crash on start due to new event queue structure, improve admin API output, and fix streams not counting completedLines
1 parent 508dcb8 commit 29c5745

File tree

4 files changed

+9
-4
lines changed

4 files changed

+9
-4
lines changed

lc-lib/admin/api/data.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func (n Bytes) HumanReadable(string) ([]byte, error) {
7070
size = float64(n) / 1024 / 1024 / 1024 / 1024
7171
}
7272

73-
return []byte(strconv.FormatFloat(size, 'g', 2, 64) + suffix), nil
73+
return []byte(strconv.FormatFloat(size, 'f', 2, 64) + suffix), nil
7474
}
7575

7676
// Float represents a floating point number in the API

lc-lib/publisher/endpoint/endpoint.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func (e *Endpoint) Init() {
7979
e.ctx = context.WithValue(context.Background(), ContextSelf, e)
8080

8181
e.warming = true
82-
backoffName := fmt.Sprintf("[E %s] Failure", e.poolEntry.Desc)
82+
backoffName := fmt.Sprintf("[E %s] Recovery", e.poolEntry.Desc)
8383
e.backoff = core.NewExpBackoff(backoffName, e.sink.config.Backoff, e.sink.config.BackoffMax)
8484

8585
e.readyElement.Value = e

lc-lib/receiver/pool.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,11 @@ func (r *Pool) Run() {
118118
ReceiverLoop:
119119
for {
120120
var nextSpool *spoolEntry = nil
121+
var nextSpoolEvents []*event.Event = nil
122+
121123
if len(r.spool) != 0 {
122124
nextSpool = r.spool[0]
125+
nextSpoolEvents = nextSpool.events
123126
}
124127

125128
select {
@@ -214,6 +217,8 @@ ReceiverLoop:
214217
} else {
215218
// Reset idle timeout
216219
r.startIdleTimeout(eventImpl.Context(), receiver, connection)
220+
// Count lines here as there is no ACK to do it
221+
connectionStatus.lines += int64(eventImpl.Count())
217222
}
218223
connectionStatus.bytes += eventImpl.Size()
219224
r.connectionLock.Unlock()
@@ -291,7 +296,7 @@ ReceiverLoop:
291296
r.startIdleTimeout(eventImpl.Context(), receiver, connection)
292297
}
293298
}
294-
case spoolChan <- nextSpool.events:
299+
case spoolChan <- nextSpoolEvents:
295300
copy(r.spool, r.spool[1:])
296301
r.spool = r.spool[:len(r.spool)-1]
297302
r.spoolSize -= int64(nextSpool.size)

lc-lib/receiver/status.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func (p *poolConnectionStatus) Update() error {
7070
p.SetEntry("listener", api.String(p.listener))
7171
p.SetEntry("description", api.String(p.desc))
7272
p.SetEntry("completedLines", api.Number(p.lines))
73-
p.SetEntry("completedBytes", api.Bytes(p.bytes))
73+
p.SetEntry("receivedBytes", api.Bytes(p.bytes))
7474
p.SetEntry("pendingPayloads", api.Number(len(p.progress)))
7575
return nil
7676
}

0 commit comments

Comments
 (0)