-
Notifications
You must be signed in to change notification settings - Fork 276
remove head of line blocking from workerpool #4648
Conversation
0ef51f2
to
cc794bf
Compare
friendly ping on this, small PR that improves the current worker pool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@steeling could you please provide more context either via an issue and/or through a more verbose PR && commit description. Given the existing implementation, it is easy to miss the specifics of what the problem is and how this change addresses the problem.
pkg/workerpool/workerpool.go
Outdated
case <-workContext.stop: | ||
log.Debug().Msgf("work[%d]: Stopped", workContext.id) | ||
log.Debug().Msgf("work[%d][%s] : took %v", id, j.JobName(), time.Since(t)) | ||
atomic.AddUint64(&wp.jobsProcessed, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my understanding: what's the advantage of using the atomic package over a mutex here? I haven't used atomic much FWIW, so I'm curious
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think atomic functions are generally fast, since the logic is simpler than mutex and allows lower level optimization. But it does not guarantee the order when reading and writing at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general atomic operations have faster performance due to being able to use special CPU instructions for atomic writes, vs leveraging underlying thread synchronization.
My guess though, is that the Go compiler can optimize simple mutex usages. Still, atomic is there for a reason, so I think it makes sense to use it where called for.
One added benefit: it's impossible to get code yourself into a deadlock with the atomic package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For posterity, here's what I found that informs my understanding of the atomic package (emphasis mine):
IIUC, atomicity only guarantees memory and CPU cache coherence with
regard to the integrity of the value being manipulated -- that is, it
only guarantees that if one CPU is writing the value while another one
is reading it, the second one won't get a value which has some of it
bits from the "old" value stored at that memory location before the
first CPU started overwriting it, and other bits from the "new" value
being written. What atomicity does not guarantee, is any ordering of
observability of values.
Functions like atomic.AddInt32 do a little more than ensuring
integrity. On most processors adding a value to memory requires a
read-modify-write memory cycle (this is true even if there is a single
"add to memory" instruction as there on x86). On a multicore machine
if two cores execute the read-modify-write cycle simultaneously it is
possible for one of the additions to be lost. atomic.AddInt32
guarantees that no additions are lost.
So I guess my question is, are you concerned about the integrity of the counter or the observability of the counter? My guess is the former; on a multi-core machine, the statistic of number of jobs processed could be corrupted without guaranteed atomicity. I don't know enough about how that stat is used, so I'll defer to you @steeling on the final answer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice find!
Welp, I looked at what we were using it for, and it turns out it's not being used. It was previously used just in tests, but not with the new format. More code removed!
workPool.nWorkers++ | ||
|
||
go (workPool.workerContext[i]).work() | ||
i := i |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this line do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's to get around this: https://dev.to/kkentzo/the-golang-for-loop-gotcha-1n35
@shashankram done! |
Thanks for the PR description edit @steeling! Sidenote: this is why I use the self-checkout at grocery stores 😉 |
Signed-off-by: Sean Teeling <[email protected]>
ping on this |
friendly ping on this @jaellio @nojnhuh @shashankram |
This PR accomplishes the following:
Head of line blocking can occur in the current implementation if a single worker is busy, the jobs queued on that worker's queue can get backed up. While the overall system can make progress, none of the jobs in that queue do. Using a single queue maintains the exact same throughput of the system, but reduces the amount of time that any single job may take due to a blockage in the queue. This is taken from queueing theory, and is the most efficient way to process items in a queue. In layman's terms, think of a line at a grocery store where each register has a unique line vs. one line that feeds all registers.
Simplifies the code by removing additional structs and concepts that are no longer needed. Code is read more than it is written, so simplification here is a large added bonus.
Leverage a tried-and-true, well known, and recommended implementation for worker pools. https://gobyexample.com/worker-pools
See the wiki entry on queueing theory