Skip to content

Refactor code, add tests #71

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 5, 2025
144 changes: 93 additions & 51 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"time"
)

func (c *controller) adjustInterval(ctx context.Context, req adjustIntervalRequest) {
func (c *ctrlBackend) adjustInterval(ctx context.Context, req adjustIntervalRequest) {
interval := roundupToSeconds(time.Until(req.resource.Next()))
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: got adjust request (current tick interval=%s, next for %q=%s)", c.tickInterval, req.resource.URL(), interval))

Expand All @@ -24,7 +24,7 @@ func (c *controller) adjustInterval(ctx context.Context, req adjustIntervalReque
}
}

func (c *controller) addResource(ctx context.Context, req addRequest) {
func (c *ctrlBackend) addResource(ctx context.Context, req addRequest) {
r := req.resource
if _, ok := c.items[r.URL()]; ok {
// Already exists
Expand All @@ -41,13 +41,13 @@ func (c *controller) addResource(ctx context.Context, req addRequest) {
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: set minimum interval to %s", c.defaultMinInterval))
r.SetMinInterval(c.defaultMinInterval)
}
close(req.reply)

c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: added resource %q", r.URL()))
sendReply(ctx, req.reply, struct{}{}, nil)
c.SetTickInterval(time.Nanosecond)
}

func (c *controller) rmResource(ctx context.Context, req rmRequest) {
func (c *ctrlBackend) rmResource(ctx context.Context, req rmRequest) {
u := req.u
if _, ok := c.items[u]; !ok {
sendReply(ctx, req.reply, struct{}{}, errResourceNotFound)
Expand All @@ -67,21 +67,33 @@ func (c *controller) rmResource(ctx context.Context, req rmRequest) {
c.check.Reset(minInterval)
}

func (c *controller) refreshResource(ctx context.Context, req refreshRequest) {
func (c *ctrlBackend) refreshResource(ctx context.Context, req refreshRequest) {
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: [refresh] START %q", req.u))
defer c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: [refresh] END %q", req.u))
u := req.u
r, ok := c.items[u]
if !ok {
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: [refresh] %s is not registered", req.u))
sendReply(ctx, req.reply, struct{}{}, errResourceNotFound)
return
}

// Make sure it's ready
if err := r.Ready(ctx); err != nil {
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: [refresh] %s did not become ready: %v", req.u, err))
sendReply(ctx, req.reply, struct{}{}, err)
return
}

r.SetNext(time.Unix(0, 0))
sendWorkerSynchronous(ctx, c.syncoutgoing, synchronousRequest{
resource: r,
reply: req.reply,
})
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: [refresh] sync request for %s sent to worker pool", req.u))
}

func (c *controller) lookupResource(ctx context.Context, req lookupRequest) {
func (c *ctrlBackend) lookupResource(ctx context.Context, req lookupRequest) {
u := req.u
r, ok := c.items[u]
if !ok {
Expand All @@ -91,7 +103,7 @@ func (c *controller) lookupResource(ctx context.Context, req lookupRequest) {
sendReply(ctx, req.reply, r, nil)
}

func (c *controller) handleRequest(ctx context.Context, req any) {
func (c *ctrlBackend) handleRequest(ctx context.Context, req any) {
switch req := req.(type) {
case adjustIntervalRequest:
c.adjustInterval(ctx, req)
Expand Down Expand Up @@ -132,64 +144,94 @@ func sendReply[T any](ctx context.Context, ch chan backendResponse[T], v T, err
}
}

func (c *controller) loop(ctx context.Context, wg *sync.WaitGroup) {
type ctrlBackend struct {
items map[string]Resource
outgoing chan Resource
syncoutgoing chan synchronousRequest
incoming chan any // incoming requests to the controller
traceSink TraceSink
tickInterval time.Duration
check *time.Ticker
defaultMaxInterval time.Duration
defaultMinInterval time.Duration
}

func (c *ctrlBackend) loop(ctx context.Context, readywg, donewg *sync.WaitGroup) {
c.traceSink.Put(ctx, "httprc controller: starting main controller loop")
readywg.Done()
defer c.traceSink.Put(ctx, "httprc controller: stopping main controller loop")
defer wg.Done()
defer donewg.Done()
for {
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: waiting for request or tick (tick interval=%s)", c.tickInterval))
select {
case req := <-c.incoming:
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: got request %T", req))
c.handleRequest(ctx, req)
case t := <-c.check.C:
var minNext time.Time
var dispatched int
minInterval := -1 * time.Second
for _, item := range c.items {
next := item.Next()
if minNext.IsZero() || next.Before(minNext) {
minNext = next
}

if interval := item.MinInterval(); minInterval < 0 || interval < minInterval {
minInterval = interval
}

if item.IsBusy() || next.After(t) {
continue
}

dispatched++
sendWorker(ctx, c.outgoing, item)
}

c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: dispatched %d resources", dispatched))

// Next check is always at the earliest next check + 1 second.
// The extra second makes sure that we are _past_ the actual next check time
// so we can send the resource to the worker pool
if interval := time.Until(minNext); interval > 0 {
c.SetTickInterval(roundupToSeconds(interval) + time.Second)
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: resetting check intervanl to %s", c.tickInterval))
} else {
// if we got here, either we have no resources, or all resources are busy.
// In this state, it's possible that the interval is less than 1 second,
// because we previously set ti to a small value for an immediate refresh.
// in this case, we want to reset it to a sane value
if c.tickInterval < time.Second {
c.SetTickInterval(minInterval)
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: resetting check intervanl to %s after forced refresh", c.tickInterval))
}
}

c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: next check in %s", c.tickInterval))
c.periodicCheck(ctx, t)
case <-ctx.Done():
return
}
}
}

func (c *controller) SetTickInterval(d time.Duration) {
func (c *ctrlBackend) periodicCheck(ctx context.Context, t time.Time) {
c.traceSink.Put(ctx, "httprc controller: START periodic check")
defer c.traceSink.Put(ctx, "httprc controller: END periodic check")
var minNext time.Time
var dispatched int
minInterval := -1 * time.Second
for _, item := range c.items {
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: checking resource %q", item.URL()))

next := item.Next()
if minNext.IsZero() || next.Before(minNext) {
minNext = next
}

if interval := item.MinInterval(); minInterval < 0 || interval < minInterval {
minInterval = interval
}

c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: resource %q isBusy=%t, next(%s).After(%s)=%t", item.URL(), item.IsBusy(), next, t, next.After(t)))
if item.IsBusy() || next.After(t) {
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: resource %q is busy or not ready yet, skipping", item.URL()))
continue
}
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: resource %q is ready, dispatching to worker pool", item.URL()))

dispatched++
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: dispatching resource %q to worker pool", item.URL()))
sendWorker(ctx, c.outgoing, item)
}

c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: dispatched %d resources", dispatched))

// Next check is always at the earliest next check + 1 second.
// The extra second makes sure that we are _past_ the actual next check time
// so we can send the resource to the worker pool
if interval := time.Until(minNext); interval > 0 {
c.SetTickInterval(roundupToSeconds(interval) + time.Second)
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: resetting check intervanl to %s", c.tickInterval))
} else {
// if we got here, either we have no resources, or all resources are busy.
// In this state, it's possible that the interval is less than 1 second,
// because we previously set it to a small value for an immediate refresh.
// in this case, we want to reset it to a sane value
if c.tickInterval < time.Second {
c.SetTickInterval(minInterval)
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: resetting check intervanl to %s after forced refresh", c.tickInterval))
}
}

c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: next check in %s", c.tickInterval))
}

func (c *ctrlBackend) SetTickInterval(d time.Duration) {
// TODO synchronize
if d <= 0 {
d = time.Second // ensure positive interval
}
c.tickInterval = d
c.check.Reset(d)
}
108 changes: 54 additions & 54 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,28 @@ import (
"github.com/lestrrat-go/httprc/v3/tracesink"
)

// setupSink creates and starts a proxy for the given sink if it's not a Nop sink
// Returns the sink to use and a cancel function that should be chained with the original cancel
func setupSink[T any, S proxysink.Backend[T], NopType any](ctx context.Context, sink S, wg *sync.WaitGroup) (S, context.CancelFunc) {
if _, ok := any(sink).(NopType); ok {
return sink, func() {}
}

proxy := proxysink.New[T](sink)
wg.Add(1)
go func(ctx context.Context, wg *sync.WaitGroup, proxy *proxysink.Proxy[T]) {
defer wg.Done()
proxy.Run(ctx)
}(ctx, wg, proxy)

// proxy can be converted to one of the sink subtypes
s, ok := any(proxy).(S)
if !ok {
panic("type assertion failed: proxy cannot be converted to type S")
}
return s, proxy.Close
}

// Client is the main entry point for the httprc package.
type Client struct {
mu sync.Mutex
Expand All @@ -24,17 +46,6 @@ type Client struct {
defaultMinInterval time.Duration
}

const DefaultWorkers = 5

// DefaultMaxInterval is the default maximum interval between fetches
const DefaultMaxInterval = 24 * time.Hour * 30

// DefaultMinInterval is the default minimum interval between fetches.
const DefaultMinInterval = 15 * time.Minute

// used internally
const oneDay = 24 * time.Hour

// NewClient creates a new `httprc.Client` object.
//
// By default ALL urls are allowed. This may not be suitable for you if
Expand Down Expand Up @@ -101,48 +112,29 @@ func (c *Client) Start(octx context.Context) (Controller, error) {
// controller to cancel this context.
ctx, cancel := context.WithCancel(octx)

var wg sync.WaitGroup
var donewg sync.WaitGroup

// start proxy goroutines that will accept sink requests
// and forward them to the appropriate sink
var errSink ErrorSink
if _, ok := c.errSink.(errsink.Nop); ok {
errSink = c.errSink
} else {
proxy := proxysink.New[error](c.errSink)
wg.Add(1)
go func(wg *sync.WaitGroup, proxy *proxysink.Proxy[error]) {
defer wg.Done()
proxy.Run(ctx)
}(&wg, proxy)

errSink = proxy
errSink, errCancel := setupSink[error, ErrorSink, errsink.Nop](ctx, c.errSink, &donewg)
traceSink, traceCancel := setupSink[string, TraceSink, tracesink.Nop](ctx, c.traceSink, &donewg)

// Chain the cancel functions
ocancel := cancel
cancel = func() {
ocancel()
errCancel()
traceCancel()
}

var traceSink TraceSink
if _, ok := c.traceSink.(tracesink.Nop); ok {
traceSink = c.traceSink
} else {
proxy := proxysink.New[string](c.traceSink)
wg.Add(1)
go func(wg *sync.WaitGroup, proxy *proxysink.Proxy[string]) {
defer wg.Done()
proxy.Run(ctx)
}(&wg, proxy)

ocancel := cancel
cancel = func() {
ocancel()
proxy.Close()
}

traceSink = proxy
}
chbuf := c.numWorkers + 1
incoming := make(chan any, chbuf)
outgoing := make(chan Resource, chbuf)
syncoutgoing := make(chan synchronousRequest, chbuf)

incoming := make(chan any, c.numWorkers)
outgoing := make(chan Resource, c.numWorkers)
syncoutgoing := make(chan synchronousRequest, c.numWorkers)
wg.Add(c.numWorkers)
var readywg sync.WaitGroup
readywg.Add(c.numWorkers)
donewg.Add(c.numWorkers)
for range c.numWorkers {
wrk := worker{
incoming: incoming,
Expand All @@ -152,32 +144,40 @@ func (c *Client) Start(octx context.Context) (Controller, error) {
traceSink: traceSink,
httpcl: c.httpcl,
}
go wrk.Run(ctx, &wg)
go wrk.Run(ctx, &readywg, &donewg)
}

tickInterval := oneDay
ctrl := &controller{
cancel: cancel,
cancel: cancel,
incoming: incoming,
shutdown: make(chan struct{}),
traceSink: traceSink,
wl: c.wl,
}

backend := &ctrlBackend{
items: make(map[string]Resource),
outgoing: outgoing,
syncoutgoing: syncoutgoing,
incoming: incoming,
traceSink: traceSink,
tickInterval: tickInterval,
check: time.NewTicker(tickInterval),
shutdown: make(chan struct{}),
wl: c.wl,

defaultMinInterval: c.defaultMinInterval,
defaultMaxInterval: c.defaultMaxInterval,
}
wg.Add(1)
go ctrl.loop(ctx, &wg)
donewg.Add(1)
readywg.Add(1)
go backend.loop(ctx, &readywg, &donewg)

go func(wg *sync.WaitGroup, ch chan struct{}) {
wg.Wait()
close(ch)
}(&wg, ctrl.shutdown)
}(&donewg, ctrl.shutdown)

readywg.Wait()

return ctrl, nil
}
Loading