Skip to content

Commit e615b19

Browse files
lucasmrodgetvictor
authored andcommitted
Fleet calendar process 100 hosts at a time (#17806)
Add concurrency for #17441.
1 parent b112d66 commit e615b19

File tree

1 file changed

+122
-72
lines changed

1 file changed

+122
-72
lines changed

cmd/fleet/calendar_cron.go

+122-72
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"slices"
78
"time"
89

910
"github.com/fleetdm/fleet/v4/ee/server/calendar"
@@ -13,6 +14,7 @@ import (
1314
"github.com/go-kit/log"
1415
kitlog "github.com/go-kit/log"
1516
"github.com/go-kit/log/level"
17+
"golang.org/x/sync/errgroup"
1618
)
1719

1820
func newCalendarSchedule(
@@ -57,7 +59,6 @@ func cronCalendarEvents(ctx context.Context, ds fleet.Datastore, logger kitlog.L
5759
return nil
5860
}
5961
googleCalendarIntegrationConfig := appConfig.Integrations.GoogleCalendar[0]
60-
calendar := createUserCalendarFromConfig(ctx, googleCalendarIntegrationConfig, logger)
6162
domain := googleCalendarIntegrationConfig.Domain
6263

6364
teams, err := ds.ListTeams(ctx, fleet.TeamFilter{
@@ -71,7 +72,7 @@ func cronCalendarEvents(ctx context.Context, ds fleet.Datastore, logger kitlog.L
7172

7273
for _, team := range teams {
7374
if err := cronCalendarEventsForTeam(
74-
ctx, ds, calendar, *team, appConfig.OrgInfo.OrgName, domain, logger,
75+
ctx, ds, googleCalendarIntegrationConfig, *team, appConfig.OrgInfo.OrgName, domain, logger,
7576
); err != nil {
7677
level.Info(logger).Log("msg", "events calendar cron", "team_id", team.ID, "err", err)
7778
}
@@ -92,7 +93,7 @@ func createUserCalendarFromConfig(ctx context.Context, config *fleet.GoogleCalen
9293
func cronCalendarEventsForTeam(
9394
ctx context.Context,
9495
ds fleet.Datastore,
95-
calendar fleet.UserCalendar,
96+
calendarConfig *fleet.GoogleCalendarIntegration,
9697
team fleet.Team,
9798
orgName string,
9899
domain string,
@@ -161,13 +162,13 @@ func cronCalendarEventsForTeam(
161162
// We execute this first to remove any calendar events for a user that is now passing
162163
// policies on one of its hosts, and possibly create a new calendar event if they have
163164
// another failing host on the same team.
164-
if err := removeCalendarEventsFromPassingHosts(ctx, ds, calendar, passingHosts); err != nil {
165+
if err := removeCalendarEventsFromPassingHosts(ctx, ds, calendarConfig, passingHosts, logger); err != nil {
165166
level.Info(logger).Log("msg", "removing calendar events from passing hosts", "err", err)
166167
}
167168

168169
// Process hosts that are failing calendar policies.
169170
if err := processCalendarFailingHosts(
170-
ctx, ds, calendar, orgName, failingHosts, logger,
171+
ctx, ds, calendarConfig, orgName, failingHosts, logger,
171172
); err != nil {
172173
level.Info(logger).Log("msg", "processing failing hosts", "err", err)
173174
}
@@ -185,67 +186,82 @@ func cronCalendarEventsForTeam(
185186
func processCalendarFailingHosts(
186187
ctx context.Context,
187188
ds fleet.Datastore,
188-
userCalendar fleet.UserCalendar,
189+
calendarConfig *fleet.GoogleCalendarIntegration,
189190
orgName string,
190191
hosts []fleet.HostPolicyMembershipData,
191192
logger kitlog.Logger,
192193
) error {
193194
hosts = filterHostsWithSameEmail(hosts)
194195

195-
for _, host := range hosts {
196-
logger := log.With(logger, "host_id", host.HostID)
197-
198-
hostCalendarEvent, calendarEvent, err := ds.GetHostCalendarEventByEmail(ctx, host.Email)
199-
200-
expiredEvent := false
201-
if err == nil {
202-
if hostCalendarEvent.HostID != host.HostID {
203-
// This calendar event belongs to another host with this associated email,
204-
// thus we skip this entry.
205-
continue // continue with next host
196+
const consumers = 100
197+
hostsCh := make(chan fleet.HostPolicyMembershipData)
198+
g, ctx := errgroup.WithContext(ctx)
199+
200+
for i := 0; i < consumers; i++ {
201+
g.Go(func() error {
202+
for host := range hostsCh {
203+
logger := log.With(logger, "host_id", host.HostID)
204+
205+
hostCalendarEvent, calendarEvent, err := ds.GetHostCalendarEventByEmail(ctx, host.Email)
206+
207+
expiredEvent := false
208+
if err == nil {
209+
if hostCalendarEvent.HostID != host.HostID {
210+
// This calendar event belongs to another host with this associated email,
211+
// thus we skip this entry.
212+
continue // continue with next host
213+
}
214+
if hostCalendarEvent.WebhookStatus == fleet.CalendarWebhookStatusPending {
215+
// This can happen if the host went offline (and never returned results)
216+
// after setting the webhook as pending.
217+
continue // continue with next host
218+
}
219+
now := time.Now()
220+
webhookAlreadyFired := hostCalendarEvent.WebhookStatus == fleet.CalendarWebhookStatusSent
221+
if webhookAlreadyFired && sameDate(now, calendarEvent.StartTime) {
222+
// If the webhook already fired today and the policies are still failing
223+
// we give a grace period of one day for the host before we schedule a new event.
224+
continue // continue with next host
225+
}
226+
if calendarEvent.EndTime.Before(now) {
227+
expiredEvent = true
228+
}
229+
}
230+
231+
userCalendar := createUserCalendarFromConfig(ctx, calendarConfig, logger)
232+
if err := userCalendar.Configure(host.Email); err != nil {
233+
return fmt.Errorf("configure user calendar: %w", err)
234+
}
235+
236+
switch {
237+
case err == nil && !expiredEvent:
238+
if err := processFailingHostExistingCalendarEvent(
239+
ctx, ds, userCalendar, orgName, hostCalendarEvent, calendarEvent, host,
240+
); err != nil {
241+
level.Info(logger).Log("msg", "process failing host existing calendar event", "err", err)
242+
continue // continue with next host
243+
}
244+
case fleet.IsNotFound(err) || expiredEvent:
245+
if err := processFailingHostCreateCalendarEvent(
246+
ctx, ds, userCalendar, orgName, host,
247+
); err != nil {
248+
level.Info(logger).Log("msg", "process failing host create calendar event", "err", err)
249+
continue // continue with next host
250+
}
251+
default:
252+
return fmt.Errorf("get calendar event: %w", err)
253+
}
206254
}
207-
if hostCalendarEvent.WebhookStatus == fleet.CalendarWebhookStatusPending {
208-
// This can happen if the host went offline (and never returned results)
209-
// after setting the webhook as pending.
210-
continue // continue with next host
211-
}
212-
now := time.Now()
213-
webhookAlreadyFired := hostCalendarEvent.WebhookStatus == fleet.CalendarWebhookStatusSent
214-
if webhookAlreadyFired && sameDate(now, calendarEvent.StartTime) {
215-
// If the webhook already fired today and the policies are still failing
216-
// we give a grace period of one day for the host before we schedule a new event.
217-
continue // continue with next host
218-
}
219-
if calendarEvent.EndTime.Before(now) {
220-
expiredEvent = true
221-
}
222-
}
223-
224-
if err := userCalendar.Configure(host.Email); err != nil {
225-
return fmt.Errorf("configure user calendar: %w", err)
226-
}
255+
return nil
256+
})
257+
}
227258

228-
switch {
229-
case err == nil && !expiredEvent:
230-
if err := processFailingHostExistingCalendarEvent(
231-
ctx, ds, userCalendar, orgName, hostCalendarEvent, calendarEvent, host,
232-
); err != nil {
233-
level.Info(logger).Log("msg", "process failing host existing calendar event", "err", err)
234-
continue // continue with next host
235-
}
236-
case fleet.IsNotFound(err) || expiredEvent:
237-
if err := processFailingHostCreateCalendarEvent(
238-
ctx, ds, userCalendar, orgName, host,
239-
); err != nil {
240-
level.Info(logger).Log("msg", "process failing host create calendar event", "err", err)
241-
continue // continue with next host
242-
}
243-
default:
244-
return fmt.Errorf("get calendar event: %w", err)
245-
}
259+
for _, host := range hosts {
260+
hostsCh <- host
246261
}
262+
close(hostsCh)
247263

248-
return nil
264+
return g.Wait()
249265
}
250266

251267
func filterHostsWithSameEmail(hosts []fleet.HostPolicyMembershipData) []fleet.HostPolicyMembershipData {
@@ -437,27 +453,61 @@ func addBusinessDay(date time.Time) time.Time {
437453
func removeCalendarEventsFromPassingHosts(
438454
ctx context.Context,
439455
ds fleet.Datastore,
440-
userCalendar fleet.UserCalendar,
456+
calendarConfig *fleet.GoogleCalendarIntegration,
441457
hosts []fleet.HostPolicyMembershipData,
458+
logger kitlog.Logger,
442459
) error {
460+
hostIDsByEmail := make(map[string][]uint)
443461
for _, host := range hosts {
444-
hostCalendarEvent, calendarEvent, err := ds.GetHostCalendarEventByEmail(ctx, host.Email)
445-
switch {
446-
case err == nil:
447-
if hostCalendarEvent.HostID != host.HostID {
448-
// This calendar event belongs to another host, thus we skip this entry.
449-
continue
462+
hostIDsByEmail[host.Email] = append(hostIDsByEmail[host.Email], host.HostID)
463+
}
464+
type emailWithHosts struct {
465+
email string
466+
hostIDs []uint
467+
}
468+
emails := make([]emailWithHosts, 0, len(hostIDsByEmail))
469+
for email, hostIDs := range hostIDsByEmail {
470+
emails = append(emails, emailWithHosts{
471+
email: email,
472+
hostIDs: hostIDs,
473+
})
474+
}
475+
476+
const consumers = 100
477+
emailsCh := make(chan emailWithHosts)
478+
g, ctx := errgroup.WithContext(ctx)
479+
480+
for i := 0; i < consumers; i++ {
481+
g.Go(func() error {
482+
for email := range emailsCh {
483+
484+
hostCalendarEvent, calendarEvent, err := ds.GetHostCalendarEventByEmail(ctx, email.email)
485+
switch {
486+
case err == nil:
487+
if ok := slices.Contains(email.hostIDs, hostCalendarEvent.HostID); !ok {
488+
// None of the hosts belong to this calendar event.
489+
continue
490+
}
491+
case fleet.IsNotFound(err):
492+
continue
493+
default:
494+
return fmt.Errorf("get calendar event from DB: %w", err)
495+
}
496+
userCalendar := createUserCalendarFromConfig(ctx, calendarConfig, logger)
497+
if err := deleteCalendarEvent(ctx, ds, userCalendar, calendarEvent); err != nil {
498+
return fmt.Errorf("delete user calendar event: %w", err)
499+
}
450500
}
451-
case fleet.IsNotFound(err):
452-
continue
453-
default:
454-
return fmt.Errorf("get calendar event from DB: %w", err)
455-
}
456-
if err := deleteCalendarEvent(ctx, ds, userCalendar, calendarEvent); err != nil {
457-
return fmt.Errorf("delete user calendar event: %w", err)
458-
}
501+
return nil
502+
})
459503
}
460-
return nil
504+
505+
for _, emailWithHostIDs := range emails {
506+
emailsCh <- emailWithHostIDs
507+
}
508+
close(emailsCh)
509+
510+
return g.Wait()
461511
}
462512

463513
func logHostsWithoutAssociatedEmail(

0 commit comments

Comments
 (0)