-
Notifications
You must be signed in to change notification settings - Fork 2
NATS develop #220
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
NATS develop #220
Conversation
* Refactor job queue handling to support priority-based processing and update related tests * Enhance job stream configuration to disallow duplicates * Add KeyValue store for job management * Add Bruno files for NATS
Switch from Redis to NATS (#2)
WalkthroughThis change migrates the Hades system from Redis/Asynq-based task queueing to a NATS JetStream-based messaging system. It updates all related configuration files, code, and documentation to use NATS for job queuing and processing, introduces new producer/consumer abstractions, and modifies priority handling and volume management in job execution. Changes
Sequence Diagram(s)sequenceDiagram
participant API as HadesAPI
participant Producer as HadesProducer (NATS JetStream)
participant NATS as NATS Server
participant Consumer as HadesConsumer
participant Scheduler as HadesScheduler
API->>Producer: EnqueueJobWithPriority(payload, priority)
Producer->>NATS: Publish job to subject (priority queue)
Consumer->>NATS: Subscribe to priority subjects
NATS-->>Consumer: Deliver job message
Consumer->>Scheduler: ScheduleJob(ctx, payload)
Scheduler->>Scheduler: Execute job (with optional early volume cleanup)
Scheduler-->>Consumer: Job result (ack/nack)
Suggested reviewers
Poem
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
…#219) * update bruno script to show failure if it fails * add shared volumes to fix "folder not found error" * ensure debug mode logs deletion of volume * implement delete volume after creation before execution flag
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 11
🔭 Outside diff range comments (3)
docker-comose.k8s.yml (1)
1-1
:⚠️ Potential issueFilename appears miss-spelled – will be ignored by most tooling
docker-comose.k8s.yml
should probably bedocker-compose.k8s.yml
.
Kubernetes/Compose-aware automation and developers searching the repo will not discover the file under the current name.-mv docker-comose.k8s.yml docker-compose.k8s.yml
ansible/hades/templates/docker-compose-api.yml.j2 (1)
31-35
:⚠️ Potential issueHealth-check uses
curl
, which is absent in the officialnats:latest
imageThe Alpine-based NATS image contains only
nats-server
; the health-check will continuously fail and keep the container instarting
state.Replace with a shell+HTTP tool baked into the image (e.g.
wget
) or use the NATS CLI:- test: ["CMD", "curl", "-f", "http://localhost:8222/healthz"] + test: ["CMD-SHELL", "wget -qO- http://localhost:8222/healthz || exit 1"]Alternatively, mount a tiny busybox image or rely on the built-in
nats-server --signal=healthz
probe.HadesAPI/router.go (1)
39-44
: 🛠️ Refactor suggestionUse
ShouldBindJSON
instead of genericShouldBind
The API only expects a JSON body.
ShouldBind
will also parse query/form data, which can unintentionally override JSON fields and complicates request validation.- if err := c.ShouldBind(&payload); err != nil { + if err := c.ShouldBindJSON(&payload); err != nil {
🧹 Nitpick comments (18)
ansible/hades/README.md (2)
35-37
: Example uses plain-text secrets
Committing real (or realistic) passwords in docs encourages bad practice.
Consider placeholder values or reference to Ansible Vault.
49-51
: Duplicate NATS variables – DRY the docs
The same block appears twice; factor it into a shared snippet or note to reduce doc drift.docs/api/Create Build Job (long-running-debug).bru (2)
16-22
: emptyDir volume needs explicit medium/size in some clusters
Several managed Kubernetes offerings reject an unqualified emptyDir on security grounds.
Consider specifying{"name":"shared","emptyDir":{"sizeLimit":"1Gi"}}
.
32-36
: Mount path & workingDir hard-coded – leak risk when reused
If another step mounts the same volume at a different path, absolute/shared
references will break.
Recommend deriving the work dir via an env var to decouple.shared/utils/prio_test.go (1)
12-22
: Add boundary-value cases to the priority tableThe mapping is correct, but tests skip edge inputs such as
3 → High
lower-bound (>=3
) and an upper bound above the documented range (e.g.,99
). Including these guards regressions cheaply.- {3, HighPriority}, + {3, HighPriority}, // lower bound of High + {99, HighPriority}, // arbitrary high out-of-band valueshared/utils/config.go (2)
12-17
: Consider namespacing TLS flag and credentials
NATS_URL
,NATS_USERNAME
,NATS_PASSWORD
,NATS_TLS_ENABLED
are fine, but the boolean is parsed from a string and the tag lacksnotEmpty
like the URL. You might also expose multiple URLs (comma-separated) for clustering later.Optionally:
-type NatsConfig struct { - URL string `env:"NATS_URL,notEmpty" envDefault:"nats://localhost:4222"` - Username string `env:"NATS_USERNAME"` - Password string `env:"NATS_PASSWORD"` - TLS bool `env:"NATS_TLS_ENABLED" envDefault:"false"` +type NatsConfig struct { + URLs []string `env:"NATS_URLS,notEmpty" envSeparator:"," envDefault:"nats://localhost:4222"` + Username string `env:"NATS_USERNAME"` + Password string `env:"NATS_PASSWORD"` + TLSEnabled bool `env:"NATS_TLS_ENABLED" envDefault:"false"` }This avoids confusion when the config expands to clusters.
22-24
:CLEANUP
env var naming is too generic
CLEANUP
collides easily with other tooling. Prefixing with project context improves clarity:- CleanupSharedVolumes bool `env:"CLEANUP" envDefault:"false"` + CleanupSharedVolumes bool `env:"HADES_CLEANUP_SHARED_VOLUMES" envDefault:"false"`docs/api/Create Build Job (Test Succeed - Result Container).bru (1)
31-36
: Minor: metadata paths still use relative notationNow that the repository is cloned into
/shared
, consider converting theHADES_*_PATH
values from./example...
to absolute paths (/shared/example...
) for symmetry with theworkingDir
change.
Purely cosmetic, but it avoids double-checking the working directory downstream.HadesAPI/router_test.go (1)
84-88
: Graceful shutdown – drain before closeTo ensure in-flight publishes/acks are flushed, call
suite.natsConnection.Drain()
beforeClose()
:- if suite.natsConnection != nil { - suite.natsConnection.Close() + if suite.natsConnection != nil { + _ = suite.natsConnection.Drain() + suite.natsConnection.Close() }Not strictly required for these tests but avoids flaky behaviour when assertions depend on JetStream state.
HadesAPI/router.go (3)
48-54
: Early-return check duplicatesif step.MemoryLimit != ""
utils.ParseMemoryLimit
already returns an error for an empty string, so the outer condition is redundant and hides potential “missing limit” validation in future. Consider simplifying:- if step.MemoryLimit != "" { - if _, err := utils.ParseMemoryLimit(step.MemoryLimit); err != nil { + if _, err := utils.ParseMemoryLimit(step.MemoryLimit); err != nil {
68-70
: Remove stale commented codeThe commented block referencing
queuePriority
is obsolete and clutters the function.
71-75
: Prefer structured logrus calls overPrintf
Using
log.Printf
drops structured fields and is inconsistent with the rest of the file.- log.Printf(" [*] Successfully enqueued job: %s", payload.QueuePayload.ID.String()) + log.Infof("Successfully enqueued job: %s", payload.QueuePayload.ID)HadesAPI/main.go (2)
35-42
:log.Fatalf
already exits; the subsequentreturn
is unreachableBoth fatal blocks include a
return
that will never run becauseFatalf
terminates the process.- log.Fatalf("Failed to connect to NATS: %v", err) - return + log.Fatalf("Failed to connect to NATS: %v", err)
43-48
: Consider injectingHadesProducer
instead of using a globalGlobals make testing and parallelism harder. Since
setupRouter
is called right after, you could pass the producer to it (or wrap it in a context) and avoid a package-level mutable variable.ansible/hades/defaults/main.yml (1)
16-19
: Expose NATS credentials via Ansible vault / env-secret variablesPlain-text defaults for
hades_nats_username
andhades_nats_password
can leak when playbooks are committed. Recommend marking themvault_
or instructing users to set them through encrypted vars files.compose.yml (1)
16-20
: Consider adding NATS authentication.The NATS service is configured without authentication. For production deployments, consider adding username/password authentication.
Add authentication environment variables:
environment: - - NATS_URL=nats://nats:4222 + - NATS_URL=nats://nats:4222 + - NATS_USERNAME=${NATS_USERNAME:-} + - NATS_PASSWORD=${NATS_PASSWORD:-}And update the NATS service command:
- command: ["-js", "-m", "8222"] + command: ["-js", "-m", "8222", "--user", "${NATS_USERNAME:-}", "--pass", "${NATS_PASSWORD:-}"]Also applies to: 33-36
shared/utils/queue.go (2)
147-157
: Fix typo in parameter name.The parameter name
queuePayloud
should bequeuePayload
.-func (hp HadesProducer) EnqueueMediumJob(ctx context.Context, queuePayloud payload.QueuePayload) error { - return hp.EnqueueJobWithPriority(ctx, queuePayloud, MediumPriority) +func (hp HadesProducer) EnqueueMediumJob(ctx context.Context, queuePayload payload.QueuePayload) error { + return hp.EnqueueJobWithPriority(ctx, queuePayload, MediumPriority) } -func (hp HadesProducer) EnqueueHighJob(ctx context.Context, queuePayloud payload.QueuePayload) error { - return hp.EnqueueJobWithPriority(ctx, queuePayloud, HighPriority) +func (hp HadesProducer) EnqueueHighJob(ctx context.Context, queuePayload payload.QueuePayload) error { + return hp.EnqueueJobWithPriority(ctx, queuePayload, HighPriority) } -func (hp HadesProducer) EnqueueLowJob(ctx context.Context, queuePayloud payload.QueuePayload) error { - return hp.EnqueueJobWithPriority(ctx, queuePayloud, LowPriority) +func (hp HadesProducer) EnqueueLowJob(ctx context.Context, queuePayload payload.QueuePayload) error { + return hp.EnqueueJobWithPriority(ctx, queuePayload, LowPriority) }
234-262
: Add error logging for Nak operations.While not critical, logging Nak errors can help with debugging message processing issues.
if err != nil { slog.Error("Failed to parse message ID", "error", err, "data", string(msg.Data())) - msg.Nak() // Negative acknowledgment, message will be redelivered + if nakErr := msg.Nak(); nakErr != nil { + slog.Error("Failed to negative acknowledge message", "error", nakErr) + } return } entry, err := hc.kv.Get(ctx, msg_id.String()) if err != nil { slog.Error("Failed to get message from KeyValue store", "error", err, "id", msg_id.String()) - msg.Nak() // Negative acknowledgment, message will be redelivered + if nakErr := msg.Nak(); nakErr != nil { + slog.Error("Failed to negative acknowledge message", "error", nakErr) + } return } // Process the message var job payload.QueuePayload if err := json.Unmarshal(entry.Value(), &job); err != nil { slog.Error("Failed to unmarshal message payload", "error", err, "data", string(msg.Data())) - msg.Nak() // Negative acknowledgment, message will be redelivered + if nakErr := msg.Nak(); nakErr != nil { + slog.Error("Failed to negative acknowledge message", "error", nakErr) + } return }🧰 Tools
🪛 golangci-lint (1.64.8)
235-235: Error return value of
msg.Nak
is not checked(errcheck)
242-242: Error return value of
msg.Nak
is not checked(errcheck)
249-249: Error return value of
msg.Nak
is not checked(errcheck)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (5)
HadesAPI/go.sum
is excluded by!**/*.sum
HadesCloneContainer/go.sum
is excluded by!**/*.sum
HadesScheduler/go.sum
is excluded by!**/*.sum
go.work.sum
is excluded by!**/*.sum
shared/go.sum
is excluded by!**/*.sum
📒 Files selected for processing (33)
.env.example
(1 hunks).vscode/tasks.json
(1 hunks)HadesAPI/go.mod
(4 hunks)HadesAPI/main.go
(3 hunks)HadesAPI/monitoring.go
(1 hunks)HadesAPI/router.go
(1 hunks)HadesAPI/router_test.go
(3 hunks)HadesCloneContainer/go.mod
(1 hunks)HadesScheduler/docker/docker.go
(4 hunks)HadesScheduler/docker/volume.go
(1 hunks)HadesScheduler/go.mod
(2 hunks)HadesScheduler/main.go
(3 hunks)ansible/hades/README.md
(3 hunks)ansible/hades/defaults/main.yml
(1 hunks)ansible/hades/templates/docker-compose-api.yml.j2
(1 hunks)ansible/hades/templates/docker-compose-scheduler.yml.j2
(1 hunks)ansible/hades/templates/hades.env.j2
(1 hunks)compose.yml
(2 hunks)docker-comose.k8s.yml
(1 hunks)docker-compose.dev.yml
(1 hunks)docs/NATS/Get streams.bru
(1 hunks)docs/NATS/folder.bru
(1 hunks)docs/api/Create Build Job (Test Fail).bru
(3 hunks)docs/api/Create Build Job (Test Succeed - Result Container).bru
(3 hunks)docs/api/Create Build Job (Test Succeed).bru
(2 hunks)docs/api/Create Build Job (long-running-debug).bru
(2 hunks)docs/bruno.json
(1 hunks)docs/environments/Local-Development.bru
(1 hunks)shared/go.mod
(1 hunks)shared/utils/config.go
(1 hunks)shared/utils/prio.go
(1 hunks)shared/utils/prio_test.go
(1 hunks)shared/utils/queue.go
(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (7)
shared/utils/prio_test.go (1)
shared/utils/prio.go (4)
Priority
(5-5)HighPriority
(8-8)MediumPriority
(9-9)LowPriority
(10-10)
HadesAPI/monitoring.go (1)
shared/payload/payload.go (1)
QueuePayload
(15-21)
shared/utils/prio.go (1)
shared/utils/queue.go (1)
NatsSubject
(18-18)
HadesAPI/router_test.go (3)
HadesAPI/main.go (1)
HadesProducer
(25-25)shared/utils/queue.go (3)
HadesProducer
(22-26)SetupNatsConnection
(36-63)NewHadesProducer
(66-102)shared/utils/config.go (1)
NatsConfig
(12-17)
HadesAPI/main.go (3)
shared/utils/config.go (1)
NatsConfig
(12-17)shared/utils/queue.go (3)
HadesProducer
(22-26)SetupNatsConnection
(36-63)NewHadesProducer
(66-102)HadesScheduler/main.go (1)
NatsConnection
(16-16)
HadesScheduler/main.go (3)
shared/utils/config.go (1)
NatsConfig
(12-17)shared/utils/queue.go (3)
HadesConsumer
(28-33)SetupNatsConnection
(36-63)NewHadesConsumer
(104-145)shared/payload/payload.go (1)
QueuePayload
(15-21)
shared/utils/queue.go (5)
shared/utils/prio.go (5)
Priority
(5-5)HighPriority
(8-8)MediumPriority
(9-9)LowPriority
(10-10)PrioritySubject
(14-16)shared/utils/config.go (1)
NatsConfig
(12-17)HadesAPI/main.go (1)
HadesProducer
(25-25)HadesScheduler/main.go (1)
HadesConsumer
(30-30)shared/payload/payload.go (1)
QueuePayload
(15-21)
🪛 dotenv-linter (3.3.0)
.env.example
[warning] 2-2: [ValueWithoutQuotes] This value needs to be surrounded in quotes
[warning] 3-3: [SpaceCharacter] The line has spaces around equal sign
[warning] 3-3: [ValueWithoutQuotes] This value needs to be surrounded in quotes
[warning] 4-4: [SpaceCharacter] The line has spaces around equal sign
[warning] 4-4: [UnorderedKey] The NATS_PASSWORD key should go before the NATS_URL key
[warning] 4-4: [ValueWithoutQuotes] This value needs to be surrounded in quotes
[warning] 5-5: [UnorderedKey] The NATS_TLS_ENABLED key should go before the NATS_URL key
[warning] 5-5: [ValueWithoutQuotes] This value needs to be surrounded in quotes
🪛 ast-grep (0.38.1)
shared/utils/queue.go
[warning] 50-50: MinVersionis missing from this TLS configuration. By default, TLS 1.2 is currently used as the minimum when acting as a client, and TLS 1.0 when acting as a server. General purpose web applications should default to TLS 1.3 with all other protocols disabled. Only where it is known that a web server must support legacy clients with unsupported an insecure browsers (such as Internet Explorer 10), it may be necessary to enable TLS 1.0 to provide support. Add
MinVersion: tls.VersionTLS13' to the TLS configuration to bump the minimum version to TLS 1.3.
Context: tls.Config{}
Note: [CWE-327]: Use of a Broken or Risky Cryptographic Algorithm [OWASP A03:2017]: Sensitive Data Exposure [OWASP A02:2021]: Cryptographic Failures [REFERENCES]
https://owasp.org/Top10/A02_2021-Cryptographic_Failures
(missing-ssl-minversion-go)
🪛 golangci-lint (1.64.8)
shared/utils/queue.go
166-166: Error return value of hp.kv.Put
is not checked
(errcheck)
235-235: Error return value of msg.Nak
is not checked
(errcheck)
242-242: Error return value of msg.Nak
is not checked
(errcheck)
249-249: Error return value of msg.Nak
is not checked
(errcheck)
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: build (./HadesScheduler/Dockerfile, ls1intum/hades/hades-scheduler) / Build Docker Image for ls1intum/hades/hades-scheduler
- GitHub Check: build (./HadesScheduler/Dockerfile, ls1intum/hades/hades-scheduler) / Build Docker Image for ls1intum/hades/hades-scheduler
- GitHub Check: build (./HadesCloneContainer/Dockerfile, ls1intum/hades/hades-clone-container) / Build Docker Image for ls1intum/hades/hades-clone-container
- GitHub Check: build (./HadesAPI/Dockerfile, ls1intum/hades/hades-api) / Build Docker Image for ls1intum/hades/hades-api
- GitHub Check: build (./HadesCloneContainer/Dockerfile, ls1intum/hades/hades-clone-container) / Build Docker Image for ls1intum/hades/hades-clone-container
- GitHub Check: build (./HadesAPI/Dockerfile, ls1intum/hades/hades-api) / Build Docker Image for ls1intum/hades/hades-api
🔇 Additional comments (21)
docs/bruno.json (1)
3-3
: Update collection name to reflect NATS migrationThe “name” field is correctly updated to “HadesCI NATS” to align with the switch from Redis to NATS. This change is consistent with the overall PR objectives.
HadesCloneContainer/go.mod (2)
32-32
: Approve bump of golang.org/x/crypto to v0.37.0
This upgrade aligns with the broader dependency migration and pulls in the latest security and bug fixes.
Please rungo mod tidy
and verify the build and tests pass.
34-34
: Approve bump of golang.org/x/sys to v0.32.0
Updating this indirect dependency keeps consistency with other modules and incorporates upstream improvements.
Ensure you’ve rungo mod tidy
and all checks remain green.docs/environments/Local-Development.bru (1)
2-2
: Ensure consistency in local development port configuration.This update changes
hostname
fromlocalhost:8081
tolocalhost:8080
; verify that all Docker Compose services, startup scripts, and other environment references have been updated to listen on port 8080.docs/NATS/folder.bru (1)
1-4
: Add NATS metadata block.The new metadata entries correctly sequence and identify the NATS documentation, aligning with the broader migration plan.
HadesScheduler/docker/volume.go (1)
33-33
: Log deletion of Docker volumes.Adding a debug statement after successful volume removal improves observability and aligns with the existing create‐volume logging.
.vscode/tasks.json (1)
5-8
: Update VSCode task to start NATS container.Renamed the task label and updated the command to boot up NATS, matching the shift from Redis to NATS JetStream.
.env.example (1)
1-5
: Introduce NATS environment variables.Replacing Redis‐related settings with NATS connection options is consistent with the migration to JetStream as the messaging backend.
🧰 Tools
🪛 dotenv-linter (3.3.0)
[warning] 2-2: [ValueWithoutQuotes] This value needs to be surrounded in quotes
[warning] 3-3: [SpaceCharacter] The line has spaces around equal sign
[warning] 3-3: [ValueWithoutQuotes] This value needs to be surrounded in quotes
[warning] 4-4: [SpaceCharacter] The line has spaces around equal sign
[warning] 4-4: [UnorderedKey] The NATS_PASSWORD key should go before the NATS_URL key
[warning] 4-4: [ValueWithoutQuotes] This value needs to be surrounded in quotes
[warning] 5-5: [UnorderedKey] The NATS_TLS_ENABLED key should go before the NATS_URL key
[warning] 5-5: [ValueWithoutQuotes] This value needs to be surrounded in quotes
docker-compose.dev.yml (1)
7-7
: Switch services to use NATS_URL.Both
hadesAPI
andhadesScheduler
now referenceNATS_URL
, reflecting the transition from Redis to NATS in development compose.Also applies to: 11-11
ansible/hades/templates/hades.env.j2 (1)
1-4
:✅ Verification successful
Verify variable names match the Go config loader
Env keys were introduced here, but if the application expects e.g.NATS_USER
orNATS_SECURE
, it will silently ignore these values.
🏁 Script executed:
#!/bin/bash # Check for hard-coded look-ups of the new vars. rg -n $'NATS_(URL|USERNAME|PASSWORD|TLS_ENABLED)'Length of output: 1782
Variable names confirmed consistent with Go config loader
The environment variable names in
ansible/hades/templates/hades.env.j2
(NATS_URL
,NATS_USERNAME
,NATS_PASSWORD
,NATS_TLS_ENABLED
) exactly match theenv
tags inshared/utils/config.go
. No changes required.ansible/hades/README.md (1)
4-4
: Minor spelling nit – “expects”
[ suggest_nitpick ]-This role expexts the following packages to be installed on the target machine: +This role expects the following packages to be installed on the target machine:docs/NATS/Get streams.bru (1)
7-11
: JetStream monitoring endpoint is unauthenticated – caution for prod
Port 8222 exposes a wide range of metrics and queue state. Make sure this is fire-walled or protected behind auth; otherwise anyone on the network can enumerate streams & consumers.HadesScheduler/go.mod (1)
13-13
:✅ Verification successful
Align NATS versions across all Go modules
github.com/nats-io/nats.go v1.42.0
is introduced here, but the API or shared modules might pull a different minor version, which will force Go to pick one and could lead to subtle incompatibilities.Run a quick cross-module check:
If the versions diverge, bump them to the same tag and run
go mod tidy
in each module.Also applies to: 48-49
🏁 Script executed:
#!/bin/bash # Ensure every go.mod pins the same nats.go version. fd go.mod | while read f; do echo "$f"; grep -E 'github.com/nats-io/nats.go' "$f" doneLength of output: 577
No NATS version divergence detected
All modules that reference github.com/nats-io/nats.go are pinned to v1.42.0 (HadesAPI/go.mod, HadesScheduler/go.mod, shared/go.mod). HadesCloneContainer doesn’t import NATS directly. No changes required.docs/api/Create Build Job (Test Succeed).bru (2)
26-26
: Priority 1 is now “low” — deliberate downgrade?With the new 3-level scheme,
priority: 1
enqueues the build at the lowest tier. Previous examples used3
(= high). Double-check that the reduced priority is intentional; otherwise bump back to3
to keep earlier behaviour.
16-21
: Volume & working-dir consistency looks goodThe added shared volume and absolute paths solve earlier workspace issues; mounting it identically in both steps prevents “file not found” errors. 👍
Also applies to: 32-36, 54-57, 58-58
HadesAPI/router.go (1)
59-62
: Priority mapping: default “3” now becomesHighPriority
PrioFromInt
treats>=3
as high, but the handler sets the default to3
, effectively marking all unspecified jobs as high-priority. Verify this change is intended; it alters queue routing semantics after the Redis→NATS migration.shared/utils/prio.go (1)
18-25
: Priority mapping may be counter-intuitiveWith the current switch,
3
maps toHighPriority
,2
toMediumPriority
,1/0
toLowPriority
. Historically many systems treat “1” as highest. Confirm this matches business rules; otherwise consider reversing the scale or documenting clearly.If default API priority is
3
, every job will be high priority.HadesScheduler/docker/docker.go (1)
93-101
: Good implementation of the cleanup feature.The warning log appropriately alerts users about this destructive operation.
HadesScheduler/main.go (2)
45-58
: Well-structured NATS connection setup.The error handling and connection management are properly implemented with appropriate logging and cleanup.
75-86
: Good implementation of job processing.The callback-based job processing with proper error handling and logging is well implemented.
shared/utils/queue.go (1)
170-272
: Well-designed priority queue consumer implementation.The implementation demonstrates good practices with worker pool management, context cancellation, and priority-based message processing. The graceful shutdown mechanism ensures in-progress jobs complete before termination.
🧰 Tools
🪛 golangci-lint (1.64.8)
235-235: Error return value of
msg.Nak
is not checked(errcheck)
242-242: Error return value of
msg.Nak
is not checked(errcheck)
249-249: Error return value of
msg.Nak
is not checked(errcheck)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See inline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
♻️ Duplicate comments (1)
shared/utils/queue.go (1)
50-55
: TLS minimum version should be 1.3, not 1.2Previous feedback already highlighted this. TLS 1.2 is acceptable but TLS 1.3 is the contemporary baseline and recommended by most security guidelines.
Staying on 1.2 forfeits a number of security and performance benefits (0-RTT, improved ciphers). Please bump totls.VersionTLS13
unless you have demonstrable legacy-client requirements.- tlsConfig := &tls.Config{ - MinVersion: tls.VersionTLS12, // Ensure TLS 1.2 or higher - } + tlsConfig := &tls.Config{ + MinVersion: tls.VersionTLS13, + }
🧹 Nitpick comments (1)
shared/utils/queue.go (1)
178-205
: Worker-spawn loop tight-spins and can flood the schedulerThe outer
for {}
continuously tries to enqueue a goroutine, even when all queues are empty.
Result: thousands of short-lived goroutines doing nothing but sleeping, wasting CPU.Instead, let the consumer block until a message arrives (e.g.
Fetch(1, jetstream.MaxWait(time.Second))
) and reuse the same worker goroutine, or back-off in the outer loop before spawning a new worker.This also simplifies the semaphore logic – a bounded worker pool reading from a shared
msgs
channel is easier to reason about.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (4)
HadesAPI/go.sum
is excluded by!**/*.sum
HadesScheduler/go.sum
is excluded by!**/*.sum
go.work
is excluded by!**/*.work
go.work.sum
is excluded by!**/*.sum
📒 Files selected for processing (8)
HadesAPI/go.mod
(3 hunks)HadesAPI/monitoring.go
(1 hunks)HadesAPI/router.go
(1 hunks)HadesAPI/router_test.go
(3 hunks)HadesScheduler/docker/docker.go
(4 hunks)HadesScheduler/go.mod
(4 hunks)compose.yml
(2 hunks)shared/utils/queue.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
- HadesAPI/router.go
- HadesAPI/router_test.go
- HadesAPI/monitoring.go
- HadesScheduler/docker/docker.go
- HadesAPI/go.mod
🧰 Additional context used
🧬 Code Graph Analysis (1)
shared/utils/queue.go (5)
shared/utils/prio.go (5)
Priority
(5-5)HighPriority
(8-8)MediumPriority
(9-9)LowPriority
(10-10)PrioritySubject
(14-16)shared/utils/config.go (1)
NatsConfig
(12-17)HadesAPI/main.go (1)
HadesProducer
(25-25)HadesScheduler/main.go (1)
HadesConsumer
(30-30)shared/payload/payload.go (1)
QueuePayload
(15-21)
🪛 golangci-lint (1.64.8)
shared/utils/queue.go
242-242: Error return value of msg.Nak
is not checked
(errcheck)
249-249: Error return value of msg.Nak
is not checked
(errcheck)
256-256: Error return value of msg.Nak
is not checked
(errcheck)
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: build (./HadesCloneContainer/Dockerfile, ls1intum/hades/hades-clone-container) / Build Docker Image for ls1intum/hades/hades-clone-container
- GitHub Check: build (./HadesCloneContainer/Dockerfile, ls1intum/hades/hades-clone-container) / Build Docker Image for ls1intum/hades/hades-clone-container
- GitHub Check: build (./HadesScheduler/Dockerfile, ls1intum/hades/hades-scheduler) / Build Docker Image for ls1intum/hades/hades-scheduler
- GitHub Check: build (./HadesScheduler/Dockerfile, ls1intum/hades/hades-scheduler) / Build Docker Image for ls1intum/hades/hades-scheduler
- GitHub Check: build (./HadesAPI/Dockerfile, ls1intum/hades/hades-api) / Build Docker Image for ls1intum/hades/hades-api
- GitHub Check: build (./HadesAPI/Dockerfile, ls1intum/hades/hades-api) / Build Docker Image for ls1intum/hades/hades-api
🔇 Additional comments (11)
compose.yml (1)
48-49
: Verify healthcheck command dependencies
The healthcheck usescurl
, which may not be present in the official NATS image. Confirm thatcurl
is installed or switch to a supported command (e.g.,CMD-SHELL
withwget --spider
or use thenats
CLI) to avoid silent healthcheck failures.HadesScheduler/go.mod (10)
3-5
: Upgrade Go version and toolchain
Go version was bumped to 1.24.0 with toolchain 1.24.4—ensure your CI, Docker images, and local development environments are updated accordingly to avoid build issues.
40-41
: Added Containerd error definitions
Pinningg.yxqyang.asia/containerd/errdefs
(and its pkg) enables richer error comparisons in your scheduler. Ensure these are leveraged in your new NATS-based error handling paths.
49-50
: Include NATS indirect dependencies
nkeys
andnuid
are required for NATS JetStream authentication and unique ID generation—this aligns with the migration to NATS.
111-113
: Upgrade Kubernetes client libraries to v0.33.1
Bumpingk8s.io/api
,k8s.io/apimachinery
, andk8s.io/client-go
—verify compatibility with your cluster’s API version to avoid runtime mismatches.
17-17
: Auto-updatedgolang.org/x/exp
andgolang.org/x/crypto
versions—no manual action needed unless specific experimental features are required.Also applies to: 36-36
44-44
: Indirect dependencies (fxamacker/cbor
,go-restful
,logr
,jsonpointer
,swag
,samber/lo
) were refreshed automatically—no concerns at this time.Also applies to: 74-74, 76-76, 78-78, 80-80, 98-98
101-101
: OpenTelemetry and message-packing libraries updated—monitor instrumentation tests for any regressions.Also applies to: 102-103, 105-106
107-107
: Standard library extensions (x/sys
,x/text
,x/time
) have been bumped—no action required.Also applies to: 108-108, 109-109
115-115
: Kubernetes auxiliary modules (kube-openapi
,utils
,structured-merge-diff
) updated indirectly—these are low-risk version bumps.Also applies to: 116-116, 118-118
10-13
: Direct dependencies updated for NATS migration
Theasynq
library has been removed and replaced bynats.go v1.43.0
, alongside updates to Docker and Fluent Logger. Confirm that all task-queue logic has transitioned to NATS and no residual Asynq imports remain.Run:
rg -L "github.com/hibiken/asynq"
to verify no leftover imports.
nats: | ||
condition: service_healthy | ||
environment: | ||
- REDIS_ADDR=redis:6379 | ||
- REDIS_PWD | ||
- NATS_URL=nats://nats:4222 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Invalid depends_on
condition in Compose v3.7
Compose file version '3.7' does not support the condition: service_healthy
field under depends_on
. These conditions will be ignored, and the services may start before NATS is ready. Consider one of the following:
- Downgrade to Compose file version '2.1' to leverage health-based
depends_on
. - Remove the
condition
block and add a startup wait script or use a healthcheck-aware entrypoint in the container.
🤖 Prompt for AI Agents
In compose.yml around lines 16 to 19, the use of 'condition: service_healthy'
under 'depends_on' is invalid for Compose file version 3.7 and will be ignored.
To fix this, either downgrade the Compose file version to 2.1 to support
health-based conditions or remove the 'condition' block entirely and implement a
startup wait script or a healthcheck-aware entrypoint in the NATS container to
ensure it is ready before dependent services start.
nats: | ||
condition: service_healthy | ||
environment: | ||
- REDIS_ADDR=redis:6379 | ||
- REDIS_PWD | ||
- NATS_URL=nats://nats:4222 | ||
- HADES_EXECUTOR=docker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Invalid depends_on
condition in Compose v3.7
As above, the condition: service_healthy
under depends_on
for hadesScheduler
is not supported in v3.7 and will be ignored. Please adjust the Compose version or implement an alternative startup-wait mechanism.
🤖 Prompt for AI Agents
In compose.yml around lines 33 to 37, the use of 'condition: service_healthy'
under 'depends_on' is invalid in Docker Compose version 3.7 and will be ignored.
To fix this, either upgrade the Compose file version to 3.8 or higher where this
condition is supported, or remove the condition and implement an alternative
startup-wait mechanism such as a wait-for-it script or healthcheck-based retry
logic in the service startup commands.
nats: | ||
image: nats:2.11.4 | ||
command: ["-js", "-m", "8222"] | ||
networks: | ||
- hades | ||
environment: | ||
- REDIS_PWD | ||
ports: | ||
- "4222:4222" | ||
- "8222:8222" | ||
healthcheck: | ||
test: [ "CMD", "redis-cli", "ping" ] | ||
test: ["CMD", "curl", "-f", "http://localhost:8222/healthz"] | ||
interval: 10s | ||
timeout: 5s | ||
retries: 5 | ||
start_period: 30s | ||
|
||
fluentbit: | ||
container_name: fluentbit | ||
image: fluent/fluent-bit | ||
networks: | ||
- hades | ||
ports: | ||
- "24224:24224" | ||
command: /fluent-bit/bin/fluent-bit -i forward -o stdout -p format=json_lines -f 1 | ||
retries: 3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ensure JetStream data persistence and pin image stability
- The NATS container enables JetStream (
-js
) but has no volume mount for/data
. Without persisting/data
, all streams will be lost on container restart. - You’ve pinned
nats:2.11.4
, which is good; ensure this version aligns with your cluster compatibility requirements.
Apply this diff to add a named volume for JetStream data:
@@ services:
nats:
image: nats:2.11.4
command: ["-js", "-m", "8222"]
+ volumes:
+ - nats_data:/data
networks:
- hades
ports:
- "4222:4222"
- "8222:8222"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8222/healthz"]
interval: 10s
timeout: 5s
retries: 3
And add at the end of compose.yml
:
volumes:
nats_data:
🤖 Prompt for AI Agents
In compose.yml around lines 39 to 51, the NATS service enables JetStream but
lacks a volume mount for /data, risking data loss on container restart. Fix this
by adding a volume mount for /data using a named volume (e.g., nats_data) under
the nats service volumes section. Then, at the end of the compose.yml file,
define the named volume nats_data under the volumes key to ensure JetStream data
persistence.
msg.Nak() // Negative acknowledgment, message will be redelivered | ||
return | ||
} | ||
|
||
entry, err := hc.kv.Get(ctx, msg_id.String()) | ||
if err != nil { | ||
slog.Error("Failed to get message from KeyValue store", "error", err, "id", msg_id.String()) | ||
msg.Nak() // Negative acknowledgment, message will be redelivered | ||
return | ||
} | ||
// Process the message | ||
var job payload.QueuePayload | ||
if err := json.Unmarshal(entry.Value(), &job); err != nil { | ||
slog.Error("Failed to unmarshal message payload", "error", err, "data", string(msg.Data())) | ||
msg.Nak() // Negative acknowledgment, message will be redelivered | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
msg.Nak()
errors are ignored – potential message loss
Every call to msg.Nak()
returns an error that must be handled; otherwise a failed NAK silently discards the message and breaks at-least-once semantics.
- msg.Nak() // Negative acknowledgment, message will be redelivered
+ if err := msg.Nak(); err != nil {
+ slog.Error("Failed to NAK message", "error", err)
+ }
Apply the same pattern to the other two occurrences in this block.
🧰 Tools
🪛 golangci-lint (1.64.8)
242-242: Error return value of msg.Nak
is not checked
(errcheck)
249-249: Error return value of msg.Nak
is not checked
(errcheck)
256-256: Error return value of msg.Nak
is not checked
(errcheck)
🤖 Prompt for AI Agents
In shared/utils/queue.go lines 242 to 257, the calls to msg.Nak() ignore
returned errors, risking silent message loss and breaking at-least-once delivery
guarantees. Update all msg.Nak() calls in this block to capture and handle the
error returned, for example by logging the error, to ensure any failure in
negative acknowledgment is detected and addressed.
bytesPayload, err := json.Marshal(queuePayloud) | ||
if err != nil { | ||
slog.Error("Failed to marshal payload", "error", err) | ||
return err | ||
} | ||
_, err = hp.js.PublishAsync(PrioritySubject(priority), queuePayloud.ID[:], jetstream.WithMsgID(queuePayloud.ID.String())) | ||
if err != nil { | ||
return err | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
PublishAsync fire-and-forget may drop errors & exhaust the async buffer
PublishAsync
queues the message in an internal buffer and returns immediately.
Two issues arise:
- You never wait for, nor inspect, the
PubAckFuture
– write failures (e.g. stream offline, message too large) will be invisible. - If the producer pushes faster than the buffer drains, the buffer will eventually block the entire goroutine.
At minimum, capture the future and check it, or switch to Publish
for a synchronous write if throughput is low.
- _, err = hp.js.PublishAsync(
- PrioritySubject(priority),
- queuePayloud.ID[:],
- jetstream.WithMsgID(queuePayloud.ID.String()),
- )
+ pubAckFuture, err := hp.js.PublishAsync(
+ PrioritySubject(priority),
+ queuePayloud.ID[:],
+ jetstream.WithMsgID(queuePayloud.ID.String()),
+ )
if err != nil {
return err
}
+
+ // Wait (bounded) for the server ack
+ if _, err := pubAckFuture.Ok(); err != nil {
+ return err
+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
bytesPayload, err := json.Marshal(queuePayloud) | |
if err != nil { | |
slog.Error("Failed to marshal payload", "error", err) | |
return err | |
} | |
_, err = hp.js.PublishAsync(PrioritySubject(priority), queuePayloud.ID[:], jetstream.WithMsgID(queuePayloud.ID.String())) | |
if err != nil { | |
return err | |
} | |
bytesPayload, err := json.Marshal(queuePayloud) | |
if err != nil { | |
slog.Error("Failed to marshal payload", "error", err) | |
return err | |
} | |
pubAckFuture, err := hp.js.PublishAsync( | |
PrioritySubject(priority), | |
queuePayloud.ID[:], | |
jetstream.WithMsgID(queuePayloud.ID.String()), | |
) | |
if err != nil { | |
return err | |
} | |
// Wait (bounded) for the server ack | |
if _, err := pubAckFuture.Ok(); err != nil { | |
return err | |
} |
🤖 Prompt for AI Agents
In shared/utils/queue.go around lines 163 to 172, the code uses PublishAsync
without handling the returned PubAckFuture, which can cause unnoticed write
failures and potential blocking if the async buffer fills. To fix this, capture
the PubAckFuture returned by PublishAsync, then wait for and check its result to
handle any errors. Alternatively, if throughput is low, replace PublishAsync
with the synchronous Publish method to ensure immediate error handling.
Summary by CodeRabbit
New Features
Bug Fixes
Documentation
Chores