Skip to content

Commit 8e6fc1e

Browse files
authored
Fix concurrency bug in calendar cron (#17832)
#17441
1 parent 16c0fb5 commit 8e6fc1e

File tree

3 files changed

+503
-34
lines changed

3 files changed

+503
-34
lines changed

cmd/fleet/calendar_cron.go

+35-25
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"slices"
8+
"sync"
89
"time"
910

1011
"github.com/fleetdm/fleet/v4/ee/server/calendar"
@@ -14,7 +15,6 @@ import (
1415
"github.com/go-kit/log"
1516
kitlog "github.com/go-kit/log"
1617
"github.com/go-kit/log/level"
17-
"golang.org/x/sync/errgroup"
1818
)
1919

2020
func newCalendarSchedule(
@@ -162,16 +162,18 @@ func cronCalendarEventsForTeam(
162162
// We execute this first to remove any calendar events for a user that is now passing
163163
// policies on one of its hosts, and possibly create a new calendar event if they have
164164
// another failing host on the same team.
165-
if err := removeCalendarEventsFromPassingHosts(ctx, ds, calendarConfig, passingHosts, logger); err != nil {
166-
level.Info(logger).Log("msg", "removing calendar events from passing hosts", "err", err)
167-
}
165+
start := time.Now()
166+
removeCalendarEventsFromPassingHosts(ctx, ds, calendarConfig, passingHosts, logger)
167+
level.Debug(logger).Log(
168+
"msg", "passing_hosts", "took", time.Since(start),
169+
)
168170

169171
// Process hosts that are failing calendar policies.
170-
if err := processCalendarFailingHosts(
171-
ctx, ds, calendarConfig, orgName, failingHosts, logger,
172-
); err != nil {
173-
level.Info(logger).Log("msg", "processing failing hosts", "err", err)
174-
}
172+
start = time.Now()
173+
processCalendarFailingHosts(ctx, ds, calendarConfig, orgName, failingHosts, logger)
174+
level.Debug(logger).Log(
175+
"msg", "failing_hosts", "took", time.Since(start),
176+
)
175177

176178
// At last we want to log the hosts that are failing and don't have an associated email.
177179
logHostsWithoutAssociatedEmail(
@@ -190,15 +192,18 @@ func processCalendarFailingHosts(
190192
orgName string,
191193
hosts []fleet.HostPolicyMembershipData,
192194
logger kitlog.Logger,
193-
) error {
195+
) {
194196
hosts = filterHostsWithSameEmail(hosts)
195197

196198
const consumers = 20
197199
hostsCh := make(chan fleet.HostPolicyMembershipData)
198-
g, ctx := errgroup.WithContext(ctx)
200+
var wg sync.WaitGroup
199201

200202
for i := 0; i < consumers; i++ {
201-
g.Go(func() error {
203+
wg.Add(+1)
204+
go func() {
205+
defer wg.Done()
206+
202207
for host := range hostsCh {
203208
logger := log.With(logger, "host_id", host.HostID)
204209

@@ -230,7 +235,8 @@ func processCalendarFailingHosts(
230235

231236
userCalendar := createUserCalendarFromConfig(ctx, calendarConfig, logger)
232237
if err := userCalendar.Configure(host.Email); err != nil {
233-
return fmt.Errorf("configure user calendar: %w", err)
238+
level.Error(logger).Log("msg", "configure user calendar", "err", err)
239+
continue // continue with next host
234240
}
235241

236242
switch {
@@ -249,19 +255,19 @@ func processCalendarFailingHosts(
249255
continue // continue with next host
250256
}
251257
default:
252-
return fmt.Errorf("get calendar event: %w", err)
258+
level.Error(logger).Log("msg", "get calendar event from db", "err", err)
259+
continue // continue with next host
253260
}
254261
}
255-
return nil
256-
})
262+
}()
257263
}
258264

259265
for _, host := range hosts {
260266
hostsCh <- host
261267
}
262268
close(hostsCh)
263269

264-
return g.Wait()
270+
wg.Wait()
265271
}
266272

267273
func filterHostsWithSameEmail(hosts []fleet.HostPolicyMembershipData) []fleet.HostPolicyMembershipData {
@@ -472,7 +478,7 @@ func removeCalendarEventsFromPassingHosts(
472478
calendarConfig *fleet.GoogleCalendarIntegration,
473479
hosts []fleet.HostPolicyMembershipData,
474480
logger kitlog.Logger,
475-
) error {
481+
) {
476482
hostIDsByEmail := make(map[string][]uint)
477483
for _, host := range hosts {
478484
hostIDsByEmail[host.Email] = append(hostIDsByEmail[host.Email], host.HostID)
@@ -491,10 +497,13 @@ func removeCalendarEventsFromPassingHosts(
491497

492498
const consumers = 20
493499
emailsCh := make(chan emailWithHosts)
494-
g, ctx := errgroup.WithContext(ctx)
500+
var wg sync.WaitGroup
495501

496502
for i := 0; i < consumers; i++ {
497-
g.Go(func() error {
503+
wg.Add(+1)
504+
go func() {
505+
defer wg.Done()
506+
498507
for email := range emailsCh {
499508

500509
hostCalendarEvent, calendarEvent, err := ds.GetHostCalendarEventByEmail(ctx, email.email)
@@ -507,23 +516,24 @@ func removeCalendarEventsFromPassingHosts(
507516
case fleet.IsNotFound(err):
508517
continue
509518
default:
510-
return fmt.Errorf("get calendar event from DB: %w", err)
519+
level.Error(logger).Log("msg", "get calendar event from DB", "err", err)
520+
continue
511521
}
512522
userCalendar := createUserCalendarFromConfig(ctx, calendarConfig, logger)
513523
if err := deleteCalendarEvent(ctx, ds, userCalendar, calendarEvent); err != nil {
514-
return fmt.Errorf("delete user calendar event: %w", err)
524+
level.Error(logger).Log("msg", "delete user calendar event", "err", err)
525+
continue
515526
}
516527
}
517-
return nil
518-
})
528+
}()
519529
}
520530

521531
for _, emailWithHostIDs := range emails {
522532
emailsCh <- emailWithHostIDs
523533
}
524534
close(emailsCh)
525535

526-
return g.Wait()
536+
wg.Wait()
527537
}
528538

529539
func logHostsWithoutAssociatedEmail(

0 commit comments

Comments
 (0)