-
Notifications
You must be signed in to change notification settings - Fork 0
Initial skeleton code for Agent service #651
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… single responsibility for better maintainance with unit/integration tests
…ave gcp specific dependencies
…erences, tests working
… with out getMissingSteps implementation
… single responsibility for better maintainance with unit/integration tests
…ave gcp specific dependencies
…tion range for dependent nodes
…solver to different location
WalkthroughThis update refactors the job orchestration system by introducing new Scala components for job polling, status reporting, and in-memory job execution and storage. The Thrift API changes replace the Changes
Sequence Diagram(s)sequenceDiagram
participant Agent as Agent Verticle
participant Orchestration as Orchestration Service
participant KV as KVStore
participant Exec as JobExecutionService
Agent->>Orchestration: GET /jobs/list?topic_id=...
Orchestration-->>Agent: JSON list of jobs
Agent->>KV: Check if job exists
alt Job not in KV
Agent->>KV: Store job
Agent->>Exec: submitJob(job)
end
sequenceDiagram
participant Agent as StatusReportingVerticle
participant KV as KVStore
participant Exec as JobExecutionService
participant Orchestration as Orchestration Service
Agent->>KV: getAllJobs
loop For each job
Agent->>Exec: getJobStatus(jobId)
alt Status changed
Agent->>KV: updateJobStatus(jobId, newStatus)
Agent->>Orchestration: PUT /jobs/status (jobId, newStatus, timestamp)
Orchestration-->>Agent: 200 OK or error
end
end
Suggested reviewers
Poem
Warning Review ran into problems🔥 ProblemsGitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository. Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 10
🧹 Nitpick comments (10)
orchestration/src/main/scala/ai/chronon/orchestration/agent/verticle/StatusReportingVerticle.scala (2)
82-93
: Add immediate initial check.Status reporting only occurs at intervals, without immediate execution.
private def startStatusReporting(startPromise: Promise[Void]): Unit = { if (isReporting.compareAndSet(false, true)) { logger.info("Starting status reporting") + // Run initial status check immediately + statusReportingHandler.handle(0L) + // Set up periodic status reporting with handler reportingTimerId = vertx.setPeriodic(AgentConfig.statusReportingIntervalMs, statusReportingHandler) startPromise.complete() } else { startPromise.complete() } }
118-126
: Allow WebClient injection for testing.The factory method supports KVStore and JobExecutionService injection, but not WebClient.
def createWithDependencies( kvStore: KVStore, - jobExecutionService: JobExecutionService + jobExecutionService: JobExecutionService, + webClient: WebClient = null ): StatusReportingVerticle = { val verticle = new StatusReportingVerticle() verticle.kvStore = kvStore verticle.jobExecutionService = jobExecutionService + if (webClient != null) { + verticle.webClient = webClient + } verticle }orchestration/src/main/scala/ai/chronon/orchestration/agent/handlers/JobPollingHandler.scala (2)
58-62
: URL should be configurable, not hardcoded.Extract the endpoint path to configuration.
+ private val jobsListEndpoint = "/jobs/list" // Move to AgentConfig in future updates // Make HTTP request to orchestration service webClient - .get(port, host, "/jobs/list") + .get(port, host, jobsListEndpoint) .addQueryParam("topic_id", AgentConfig.topicId)
90-90
: Missing validation for empty jobs list.Consider adding early return or debug log.
val jobsToStart = (0 until jobsToStartJson.size()).map { i => // ... } logger.info(s"Received ${jobsToStart.size} jobs to start") + + if (jobsToStart.isEmpty) { + logger.debug("No jobs to start, returning") + return + }orchestration/src/main/scala/ai/chronon/orchestration/agent/handlers/StatusReportingHandler.scala (2)
86-86
: Excessive logging on each status update.This log generates noise for every status update.
- logger.info(s"Connecting to host:$host at port:$port") + logger.debug(s"Connecting to host:$host at port:$port for job $jobId")
33-56
: Consider batching status updates.For efficiency, batch multiple status updates.
Implement a batching mechanism that collects status changes and sends them in a single request, reducing network overhead for multiple simultaneous status changes.
orchestration/src/main/scala/ai/chronon/orchestration/agent/JobExecutionService.scala (2)
21-26
: Method comment and return type mismatch.Comment says "as a string" but returns JobStatusType.
/** Gets the current status of a job. * * @param jobId The unique identifier for the job - * @return The status of the job as a string + * @return The status of the job as JobStatusType */ def getJobStatus(jobId: String): JobStatusType
10-12
: TODO comment needs to be addressed in future PRs.Ensure this is tracked in your issue tracker.
Would you like me to create a GitHub issue to track extending this service for GCP and AWS implementations?
orchestration/src/main/scala/ai/chronon/orchestration/agent/verticle/JobPollingVerticle.scala (2)
80-91
: Potential race condition in startPolling.Use compareAndSet correctly but code flow is confusing.
private def startPolling(startPromise: Promise[Void]): Unit = { if (isPolling.compareAndSet(false, true)) { logger.info("Starting job polling") // Set up periodic polling with handler pollingTimerId = vertx.setPeriodic(AgentConfig.pollingIntervalMs, jobPollingHandler) startPromise.complete() } else { + logger.info("Job polling already active, not starting again") startPromise.complete() } }
46-68
: Consider Constructor Dependency Injection.Current design has mutable fields set after construction.
A cleaner approach would be to use constructor dependency injection instead of field initialization. This would make dependencies explicit and improve testability while reducing mutability.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (12)
api/thrift/agent.thrift
(2 hunks)orchestration/BUILD.bazel
(1 hunks)orchestration/src/main/scala/ai/chronon/orchestration/agent/AgentConfig.scala
(1 hunks)orchestration/src/main/scala/ai/chronon/orchestration/agent/JobExecutionService.scala
(1 hunks)orchestration/src/main/scala/ai/chronon/orchestration/agent/KVStore.scala
(1 hunks)orchestration/src/main/scala/ai/chronon/orchestration/agent/handlers/JobPollingHandler.scala
(1 hunks)orchestration/src/main/scala/ai/chronon/orchestration/agent/handlers/StatusReportingHandler.scala
(1 hunks)orchestration/src/main/scala/ai/chronon/orchestration/agent/verticle/JobPollingVerticle.scala
(1 hunks)orchestration/src/main/scala/ai/chronon/orchestration/agent/verticle/StatusReportingVerticle.scala
(1 hunks)orchestration/src/main/scala/ai/chronon/orchestration/service/handlers/JobsHandler.scala
(2 hunks)orchestration/src/test/scala/ai/chronon/orchestration/test/service/handlers/JobsHandlerSpec.scala
(2 hunks)tools/build_rules/prelude_bazel
(2 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
orchestration/src/main/scala/ai/chronon/orchestration/agent/JobExecutionService.scala (2)
api/src/main/scala/ai/chronon/api/ParametricMacro.scala (1)
api
(66-66)orchestration/src/main/scala/ai/chronon/orchestration/agent/KVStore.scala (1)
createInMemory
(72-72)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: non_spark_tests
- GitHub Check: non_spark_tests
- GitHub Check: bazel_config_tests
- GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (13)
tools/build_rules/prelude_bazel (2)
92-92
: Added Vert.x URI Template dependency.The addition of the
vertx-uri-template
dependency supports URI parsing in the new Agent service.
102-102
: Added Vert.x URI Template test dependency.Consistent addition of the dependency for test environment.
api/thrift/agent.thrift (2)
77-77
: Updated Job struct to reference JobInfo.Good refactoring to support more comprehensive job metadata.
95-100
:❓ Verification inconclusive
Added explicit UNKNOWN status and shifted enum values.
Adding UNKNOWN status improves API robustness, but shifting enum values could break existing code.
🏁 Script executed:
#!/bin/bash # Find occurrences of JobStatusType enum values being used directly as integers rg -A 3 -B 3 "\bJobStatusType\b.*\b[0-9]+" --type scalaLength of output: 57
Action: Verify enum value usage across clients.
- The
api/thrift/agent.thrift
enum now starts withUNKNOWN = 0
, shifting subsequent values.- Automated search for direct integer usage in Scala produced no output, but this is inconclusive.
- Ensure manual review of all consumers of
JobStatusType
to avoid unexpected breakage.orchestration/BUILD.bazel (1)
115-115
: Added resources inclusion to assembly.Essential addition for packaging configuration files needed by the Agent service.
orchestration/src/main/scala/ai/chronon/orchestration/service/handlers/JobsHandler.scala (2)
4-4
: Added JobInfo import.Required for the new job structure.
45-49
: Updated job creation to use JobInfo.Implementation properly adapts to the new Thrift structure.
orchestration/src/test/scala/ai/chronon/orchestration/test/service/handlers/JobsHandlerSpec.scala (2)
68-69
: Updated job ID access pattern correctly.Code accesses jobId via the new jobInfo structure, aligning with the Thrift API changes.
96-96
: Job ID access correctly updated.Updated to access jobId through jobInfo structure, consistent with the data model changes.
orchestration/src/main/scala/ai/chronon/orchestration/agent/AgentConfig.scala (1)
3-6
: TODO comment needs implementation plan.The comment indicates these configurations need to be moved to a client-specific file.
Do you plan to implement this configuration file in a follow-up PR? Consider adding a tracking issue to your project management system.
orchestration/src/main/scala/ai/chronon/orchestration/agent/KVStore.scala (1)
5-12
: Documentation clearly states implementation status.The TODO comment provides context that this is a placeholder implementation.
orchestration/src/main/scala/ai/chronon/orchestration/agent/verticle/StatusReportingVerticle.scala (1)
87-87
:❓ Verification inconclusive
Verify StatusReportingHandler implements Handler.
StatusReportingHandler is directly used with setPeriodic, ensure it implements Handler.
Run this script to check the implementation:
🏁 Script executed:
#!/bin/bash # Check if StatusReportingHandler implements Handler<Long> ast-grep --pattern 'class StatusReportingHandler$_$ implements Handler<Long>' || echo "StatusReportingHandler doesn't implement Handler<Long>"Length of output: 79
Verify StatusReportingHandler Conformance
Please ensure that
StatusReportingHandler
extendsHandler[Long]
as required by its use insetPeriodic
. The previous automated check was inconclusive; a manual verification is advised.
- File: orchestration/src/main/scala/ai/chronon/orchestration/agent/verticle/StatusReportingVerticle.scala, line 87
orchestration/src/main/scala/ai/chronon/orchestration/agent/verticle/JobPollingVerticle.scala (1)
93-108
: Good cleanup in stop method.Resources are properly released on shutdown.
/** Hostname for the orchestration service */ | ||
val orchestrationServiceHostname: String = "localhost" | ||
|
||
/** Port for the orchestration service */ | ||
val orchestrationServicePort: Int = 3903 | ||
|
||
/** Interval for polling jobs from the orchestration service (in milliseconds) */ | ||
val pollingIntervalMs: Long = 5 * 60 * 1000 | ||
|
||
/** Interval for reporting job status to the orchestration service (in milliseconds) */ | ||
val statusReportingIntervalMs: Long = 5 * 60 * 1000 | ||
|
||
/** PubSub topic id to pull job requests from */ | ||
val topicId: String = "test-topic" | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Replace hardcoded values with configurable parameters.
Hardcoded values limit flexibility across different environments.
- /** Hostname for the orchestration service */
- val orchestrationServiceHostname: String = "localhost"
-
- /** Port for the orchestration service */
- val orchestrationServicePort: Int = 3903
-
- /** Interval for polling jobs from the orchestration service (in milliseconds) */
- val pollingIntervalMs: Long = 5 * 60 * 1000
-
- /** Interval for reporting job status to the orchestration service (in milliseconds) */
- val statusReportingIntervalMs: Long = 5 * 60 * 1000
-
- /** PubSub topic id to pull job requests from */
- val topicId: String = "test-topic"
+ /** Hostname for the orchestration service */
+ val orchestrationServiceHostname: String = sys.env.getOrElse("ORCHESTRATION_SERVICE_HOSTNAME", "localhost")
+
+ /** Port for the orchestration service */
+ val orchestrationServicePort: Int = sys.env.getOrElse("ORCHESTRATION_SERVICE_PORT", "3903").toInt
+
+ /** Interval for polling jobs from the orchestration service (in milliseconds) */
+ val pollingIntervalMs: Long = sys.env.getOrElse("POLLING_INTERVAL_MS", (5 * 60 * 1000).toString).toLong
+
+ /** Interval for reporting job status to the orchestration service (in milliseconds) */
+ val statusReportingIntervalMs: Long = sys.env.getOrElse("STATUS_REPORTING_INTERVAL_MS", (5 * 60 * 1000).toString).toLong
+
+ /** PubSub topic id to pull job requests from */
+ val topicId: String = sys.env.getOrElse("TOPIC_ID", "test-topic")
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
/** Hostname for the orchestration service */ | |
val orchestrationServiceHostname: String = "localhost" | |
/** Port for the orchestration service */ | |
val orchestrationServicePort: Int = 3903 | |
/** Interval for polling jobs from the orchestration service (in milliseconds) */ | |
val pollingIntervalMs: Long = 5 * 60 * 1000 | |
/** Interval for reporting job status to the orchestration service (in milliseconds) */ | |
val statusReportingIntervalMs: Long = 5 * 60 * 1000 | |
/** PubSub topic id to pull job requests from */ | |
val topicId: String = "test-topic" | |
} | |
/** Hostname for the orchestration service */ | |
val orchestrationServiceHostname: String = sys.env.getOrElse("ORCHESTRATION_SERVICE_HOSTNAME", "localhost") | |
/** Port for the orchestration service */ | |
val orchestrationServicePort: Int = sys.env.getOrElse("ORCHESTRATION_SERVICE_PORT", "3903").toInt | |
/** Interval for polling jobs from the orchestration service (in milliseconds) */ | |
val pollingIntervalMs: Long = sys.env.getOrElse("POLLING_INTERVAL_MS", (5 * 60 * 1000).toString).toLong | |
/** Interval for reporting job status to the orchestration service (in milliseconds) */ | |
val statusReportingIntervalMs: Long = sys.env.getOrElse("STATUS_REPORTING_INTERVAL_MS", (5 * 60 * 1000).toString).toLong | |
/** PubSub topic id to pull job requests from */ | |
val topicId: String = sys.env.getOrElse("TOPIC_ID", "test-topic") | |
} |
class InMemoryKVStore extends KVStore { | ||
private val store = scala.collection.mutable.Map[String, Job]() | ||
|
||
override def storeJob(jobId: String, job: Job): Unit = { | ||
store.put(jobId, job) | ||
} | ||
|
||
override def getJob(jobId: String): Option[Job] = { | ||
store.get(jobId) | ||
} | ||
|
||
override def getAllJobs: List[Job] = { | ||
store.values.toList | ||
} | ||
|
||
override def updateJobStatus(jobId: String, status: JobStatusType): Unit = { | ||
store.get(jobId).foreach { job => | ||
job.jobInfo.setCurrentStatus(status) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make InMemoryKVStore thread-safe.
Current implementation uses mutable map without synchronization, potentially leading to race conditions.
class InMemoryKVStore extends KVStore {
- private val store = scala.collection.mutable.Map[String, Job]()
+ private val store = new java.util.concurrent.ConcurrentHashMap[String, Job]()
override def storeJob(jobId: String, job: Job): Unit = {
- store.put(jobId, job)
+ store.put(jobId, job)
}
override def getJob(jobId: String): Option[Job] = {
- store.get(jobId)
+ Option(store.get(jobId))
}
override def getAllJobs: List[Job] = {
- store.values.toList
+ import scala.jdk.CollectionConverters._
+ store.values().asScala.toList
}
override def updateJobStatus(jobId: String, status: JobStatusType): Unit = {
- store.get(jobId).foreach { job =>
- job.jobInfo.setCurrentStatus(status)
+ Option(store.get(jobId)).foreach { job =>
+ // Create updated job with new status to avoid race conditions
+ val updatedJob = job.deepCopy()
+ updatedJob.jobInfo.setCurrentStatus(status)
+ store.put(jobId, updatedJob)
}
}
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
class InMemoryKVStore extends KVStore { | |
private val store = scala.collection.mutable.Map[String, Job]() | |
override def storeJob(jobId: String, job: Job): Unit = { | |
store.put(jobId, job) | |
} | |
override def getJob(jobId: String): Option[Job] = { | |
store.get(jobId) | |
} | |
override def getAllJobs: List[Job] = { | |
store.values.toList | |
} | |
override def updateJobStatus(jobId: String, status: JobStatusType): Unit = { | |
store.get(jobId).foreach { job => | |
job.jobInfo.setCurrentStatus(status) | |
} | |
} | |
} | |
class InMemoryKVStore extends KVStore { | |
private val store = new java.util.concurrent.ConcurrentHashMap[String, Job]() | |
override def storeJob(jobId: String, job: Job): Unit = { | |
store.put(jobId, job) | |
} | |
override def getJob(jobId: String): Option[Job] = { | |
Option(store.get(jobId)) | |
} | |
override def getAllJobs: List[Job] = { | |
import scala.jdk.CollectionConverters._ | |
store.values().asScala.toList | |
} | |
override def updateJobStatus(jobId: String, status: JobStatusType): Unit = { | |
Option(store.get(jobId)).foreach { job => | |
// Create updated job with new status to avoid race conditions | |
val updatedJob = job.deepCopy() | |
updatedJob.jobInfo.setCurrentStatus(status) | |
store.put(jobId, updatedJob) | |
} | |
} | |
} |
*/ | ||
def storeJob(jobId: String, job: Job): Unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
❓ Verification inconclusive
Add validation for inputs.
No checks for null or empty jobId.
/** Stores a job with the given ID.
*
* @param jobId The unique identifier for the job
* @param job The job object to store
*/
- def storeJob(jobId: String, job: Job): Unit
+ def storeJob(jobId: String, job: Job): Unit
// Implementation example:
override def storeJob(jobId: String, job: Job): Unit = {
require(jobId != null && jobId.nonEmpty, "Job ID cannot be null or empty")
require(job != null, "Job cannot be null")
// ... rest of implementation
}
Input validation required:
Please add inline validations to ensure that jobId
is neither null nor empty and that job
is not null. For example:
override def storeJob(jobId: String, job: Job): Unit = {
require(jobId != null && jobId.nonEmpty, "Job ID cannot be null or empty")
require(job != null, "Job cannot be null")
// ... rest of implementation
}
if (kvStore.getJob(job.jobInfo.getJobId).isEmpty) { | ||
// Store job in KV store | ||
kvStore.storeJob(job.jobInfo.getJobId, job) | ||
|
||
// Submit job to cluster | ||
jobExecutionService.submitJob(job) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Job submission lacks error handling.
Add try-catch around store and submit operations.
- // Store job in KV store
- kvStore.storeJob(job.jobInfo.getJobId, job)
-
- // Submit job to cluster
- jobExecutionService.submitJob(job)
+ try {
+ // Store job in KV store
+ kvStore.storeJob(job.jobInfo.getJobId, job)
+
+ // Submit job to cluster
+ jobExecutionService.submitJob(job)
+ logger.debug(s"Successfully submitted job ${job.jobInfo.getJobId}")
+ } catch {
+ case e: Exception =>
+ logger.error(s"Failed to store/submit job ${job.jobInfo.getJobId}", e)
+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if (kvStore.getJob(job.jobInfo.getJobId).isEmpty) { | |
// Store job in KV store | |
kvStore.storeJob(job.jobInfo.getJobId, job) | |
// Submit job to cluster | |
jobExecutionService.submitJob(job) | |
} | |
if (kvStore.getJob(job.jobInfo.getJobId).isEmpty) { | |
try { | |
// Store job in KV store | |
kvStore.storeJob(job.jobInfo.getJobId, job) | |
// Submit job to cluster | |
jobExecutionService.submitJob(job) | |
logger.debug(s"Successfully submitted job ${job.jobInfo.getJobId}") | |
} catch { | |
case e: Exception => | |
logger.error(s"Failed to store/submit job ${job.jobInfo.getJobId}", e) | |
} | |
} |
// Create JobInfo | ||
val jobInfo = new JobInfo() | ||
jobInfo.setJobId(jobJson.getString("jobId")) | ||
// Set other job info properties as needed | ||
|
||
// Create Job with JobInfo | ||
val job = new Job() | ||
job.setJobInfo(jobInfo) | ||
// Set other job properties as needed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Incomplete job creation implementation.
Comments indicate missing properties.
🏁 Script executed:
#!/bin/bash
# Verify full JobInfo and Job structure requirements
rg -A 5 "class JobInfo|class Job" --type scala
Length of output: 6986
Action: Fully initialize Job and JobInfo.
The current implementation only sets the jobId, with placeholder comments for other properties. Please update the code based on the complete specifications of JobInfo and Job to ensure that all mandatory fields are appropriately initialized.
...stration/src/main/scala/ai/chronon/orchestration/agent/handlers/StatusReportingHandler.scala
Outdated
Show resolved
Hide resolved
val jobId = job.jobInfo.getJobId | ||
val currentStatus = job.jobInfo.getCurrentStatus | ||
val latestStatus = jobExecutionService.getJobStatus(jobId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing null checks for job.jobInfo.
Add null checks to prevent NPEs.
jobs.foreach { job =>
- val jobId = job.jobInfo.getJobId
- val currentStatus = job.jobInfo.getCurrentStatus
+ if (job.jobInfo == null) {
+ logger.warn("Found job with null jobInfo, skipping")
+ return
+ }
+ val jobId = job.jobInfo.getJobId
+ val currentStatus = job.jobInfo.getCurrentStatus
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
val jobId = job.jobInfo.getJobId | |
val currentStatus = job.jobInfo.getCurrentStatus | |
val latestStatus = jobExecutionService.getJobStatus(jobId) | |
jobs.foreach { job => | |
if (job.jobInfo == null) { | |
logger.warn("Found job with null jobInfo, skipping") | |
return | |
} | |
val jobId = job.jobInfo.getJobId | |
val currentStatus = job.jobInfo.getCurrentStatus | |
val latestStatus = jobExecutionService.getJobStatus(jobId) | |
} |
override def submitJob(job: Job): Unit = { | ||
// Simulate job submission | ||
jobStatuses.put(job.jobInfo.getJobId, JobStatusType.RUNNING) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add null check for job.jobInfo.
Prevent NPE if jobInfo is null.
override def submitJob(job: Job): Unit = {
// Simulate job submission
- jobStatuses.put(job.jobInfo.getJobId, JobStatusType.RUNNING)
+ if (job != null && job.jobInfo != null) {
+ jobStatuses.put(job.jobInfo.getJobId, JobStatusType.RUNNING)
+ } else {
+ throw new IllegalArgumentException("Job or JobInfo cannot be null")
+ }
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
override def submitJob(job: Job): Unit = { | |
// Simulate job submission | |
jobStatuses.put(job.jobInfo.getJobId, JobStatusType.RUNNING) | |
} | |
override def submitJob(job: Job): Unit = { | |
// Simulate job submission | |
if (job != null && job.jobInfo != null) { | |
jobStatuses.put(job.jobInfo.getJobId, JobStatusType.RUNNING) | |
} else { | |
throw new IllegalArgumentException("Job or JobInfo cannot be null") | |
} | |
} |
override def getJobStatus(jobId: String): JobStatusType = { | ||
// Return job status or default to "UNKNOWN" | ||
jobStatuses.getOrElse(jobId, JobStatusType.UNKNOWN) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Missing validation for jobId parameter.
Add check for null or empty jobId.
override def getJobStatus(jobId: String): JobStatusType = {
// Return job status or default to "UNKNOWN"
+ if (jobId == null || jobId.isEmpty) {
+ return JobStatusType.UNKNOWN
+ }
jobStatuses.getOrElse(jobId, JobStatusType.UNKNOWN)
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
override def getJobStatus(jobId: String): JobStatusType = { | |
// Return job status or default to "UNKNOWN" | |
jobStatuses.getOrElse(jobId, JobStatusType.UNKNOWN) | |
} | |
override def getJobStatus(jobId: String): JobStatusType = { | |
// Return job status or default to "UNKNOWN" | |
if (jobId == null || jobId.isEmpty) { | |
return JobStatusType.UNKNOWN | |
} | |
jobStatuses.getOrElse(jobId, JobStatusType.UNKNOWN) | |
} |
logger.info("Starting job polling") | ||
|
||
// Set up periodic polling with handler | ||
pollingTimerId = vertx.setPeriodic(AgentConfig.pollingIntervalMs, jobPollingHandler) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Configuration should be more explicit.
Don't rely on global AgentConfig.
// Set up periodic polling with handler
-pollingTimerId = vertx.setPeriodic(AgentConfig.pollingIntervalMs, jobPollingHandler)
+final long pollingInterval = AgentConfig.pollingIntervalMs;
+logger.info(s"Setting polling interval to $pollingInterval ms");
+pollingTimerId = vertx.setPeriodic(pollingInterval, jobPollingHandler)
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Largely looks good, left a few cosmetic / nit shaped comments
*/ | ||
object AgentConfig { | ||
|
||
/** Hostname for the orchestration service */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - these comments don't add a ton as your field names are pretty descriptive as is. So you could drop them for brevity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
* This service provides abstraction for job submission and status management, | ||
* allowing the application to work with different execution backends. | ||
* | ||
* TODO: To extend this further for handling all types of jobs and have |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can remove this as it feels like a useful comment more for the PR description. For the code itself the todo could highlight specific methods / aspects that are missing that you want to address
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
* TODO: To use our chronon KVStore, extend if needed and have implementations for both GCP and AWS | ||
* Just a place holder will change a lot in coming PRs | ||
*/ | ||
trait KVStore { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we instead call this AgentStore or something more specific than KV store? The interface isn't generic enough to be a kv store (unlike the KVStore trait we have in api which is very generic..)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, makes sense. Addressed this in the later PR #668
* This service provides abstraction for key-value storage operations, | ||
* allowing the application to work with different storage implementations. | ||
* | ||
* TODO: To use our chronon KVStore, extend if needed and have implementations for both GCP and AWS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be nice if we could inject that KV store here (if we know the apis match what we need)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed this in the later PR #668
logger.info("Polling for jobs") | ||
|
||
try { | ||
logger.info(s"topicId: ${AgentConfig.topicId}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - can merge this log line with the one on 31
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
logger.info(s"topicId: ${AgentConfig.topicId}") | ||
|
||
// Create a proper response handler | ||
val responseHandler = new Handler[AsyncResult[HttpResponse[Buffer]]] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for readability, can pull this out into a separate method or class level handler field
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, done.
|
||
// Send status update to orchestration service with proper handler | ||
webClient | ||
.put(port, host, "/jobs/$jobId/status") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this uri string have a s"..."
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
private def createKVStore(): KVStore = { | ||
// Use the newly created in-memory implementation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
redundant comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
private def createJobExecutionService(): JobExecutionService = { | ||
// Use the newly created in-memory implementation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
redundant comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. In general claude code is adding more comments which are redundant and not needed in most cases, so i had to explicitly tell it, gave an example to follow and remember for all future code and it seems to be doing ok now :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
orchestration/src/main/scala/ai/chronon/orchestration/agent/handlers/JobPollingHandler.scala (2)
76-84
:⚠️ Potential issueIncomplete job initialization.
Fill in required properties for JobInfo and Job.
94-100
:⚠️ Potential issueJob submission lacks error handling.
Add try-catch around store and submit operations.
🧹 Nitpick comments (4)
orchestration/src/main/scala/ai/chronon/orchestration/agent/handlers/JobPollingHandler.scala (4)
31-31
: Logger information could be combined.Merge the topic ID log with the connection log.
- logger.info(s"Polling for jobs with topicId: ${AgentConfig.topicId}") - - try { - val host = AgentConfig.orchestrationServiceHostname - val port = AgentConfig.orchestrationServicePort - - logger.info(s"Connecting to host:$host at port:$port") + try { + val host = AgentConfig.orchestrationServiceHostname + val port = AgentConfig.orchestrationServicePort + + logger.info(s"Polling for jobs with topicId: ${AgentConfig.topicId} using host:$host at port:$port")
33-38
: AgentConfig dependency hardcoded.Inject config for better testability.
class JobPollingHandler( webClient: WebClient, kvStore: KVStore, - jobExecutionService: JobExecutionService + jobExecutionService: JobExecutionService, + config: AgentConfig ) extends Handler[java.lang.Long] { private val logger: Logger = LoggerFactory.getLogger(classOf[JobPollingHandler]) /** Vert.x Handler implementation that gets called periodically. * * @param timerId The ID of the timer that triggered this handler */ override def handle(timerId: java.lang.Long): Unit = { pollForJobs() } /** Polls the orchestration service for new jobs. */ private def pollForJobs(): Unit = { logger.info(s"Polling for jobs with topicId: ${config.topicId}") try { val host = config.orchestrationServiceHostname val port = config.orchestrationServicePort
51-66
: Extract handler to class field.Move response handler creation to class level.
class JobPollingHandler( webClient: WebClient, kvStore: KVStore, jobExecutionService: JobExecutionService ) extends Handler[java.lang.Long] { private val logger: Logger = LoggerFactory.getLogger(classOf[JobPollingHandler]) + + /** Response handler for job polling HTTP requests */ + private val jobPollingResponseHandler = new Handler[AsyncResult[HttpResponse[Buffer]]] { + override def handle(ar: AsyncResult[HttpResponse[Buffer]]): Unit = { + if (ar.succeeded()) { + val response = ar.result() + if (response.statusCode() == 200) { + val responseBody = response.bodyAsJsonObject() + processJobsResponse(responseBody) + } else { + logger.error(s"Failed to poll for jobs: ${response.statusCode()} ${response.statusMessage()}") + } + } else { + logger.error("Failed to poll for jobs", ar.cause()) + } + } + } /** Vert.x Handler implementation that gets called periodically. * * @param timerId The ID of the timer that triggered this handler */ override def handle(timerId: java.lang.Long): Unit = { pollForJobs() } /** Polls the orchestration service for new jobs. */ private def pollForJobs(): Unit = { logger.info(s"Polling for jobs with topicId: ${AgentConfig.topicId}") try { val host = AgentConfig.orchestrationServiceHostname val port = AgentConfig.orchestrationServicePort logger.info(s"Connecting to host:$host at port:$port") // Make HTTP request to orchestration service webClient .get(port, host, "/jobs/list") .addQueryParam("topic_id", AgentConfig.topicId) - .send(createJobPollingResponseHandler()) + .send(jobPollingResponseHandler) } catch { case e: Exception => logger.error("Error while polling for jobs", e) } } - /** Creates a response handler for job polling HTTP requests */ - private def createJobPollingResponseHandler(): Handler[AsyncResult[HttpResponse[Buffer]]] = { - (ar: AsyncResult[HttpResponse[Buffer]]) => - { - if (ar.succeeded()) { - val response = ar.result() - if (response.statusCode() == 200) { - val responseBody = response.bodyAsJsonObject() - processJobsResponse(responseBody) - } else { - logger.error(s"Failed to poll for jobs: ${response.statusCode()} ${response.statusMessage()}") - } - } else { - logger.error("Failed to poll for jobs", ar.cause()) - } - } - }
103-107
: Success log level inconsistency.Match log levels for better analysis.
} match { case Success(_) => - logger.info("Successfully processed jobs response") + logger.debug("Successfully processed jobs response") case Failure(e) => logger.error("Error processing jobs response", e) }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (6)
api/thrift/agent.thrift
(2 hunks)orchestration/src/main/scala/ai/chronon/orchestration/agent/AgentConfig.scala
(1 hunks)orchestration/src/main/scala/ai/chronon/orchestration/agent/JobExecutionService.scala
(1 hunks)orchestration/src/main/scala/ai/chronon/orchestration/agent/handlers/JobPollingHandler.scala
(1 hunks)orchestration/src/main/scala/ai/chronon/orchestration/agent/handlers/StatusReportingHandler.scala
(1 hunks)orchestration/src/main/scala/ai/chronon/orchestration/agent/verticle/JobPollingVerticle.scala
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
- orchestration/src/main/scala/ai/chronon/orchestration/agent/AgentConfig.scala
- orchestration/src/main/scala/ai/chronon/orchestration/agent/handlers/StatusReportingHandler.scala
- api/thrift/agent.thrift
- orchestration/src/main/scala/ai/chronon/orchestration/agent/JobExecutionService.scala
- orchestration/src/main/scala/ai/chronon/orchestration/agent/verticle/JobPollingVerticle.scala
⏰ Context from checks skipped due to timeout of 90000ms (21)
- GitHub Check: service_tests
- GitHub Check: service_tests
- GitHub Check: service_commons_tests
- GitHub Check: hub_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: hub_tests
- GitHub Check: online_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: api_tests
- GitHub Check: api_tests
- GitHub Check: flink_tests
- GitHub Check: online_tests
- GitHub Check: aggregator_tests
- GitHub Check: flink_tests
- GitHub Check: aggregator_tests
- GitHub Check: orchestration_tests
- GitHub Check: orchestration_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (1)
orchestration/src/main/scala/ai/chronon/orchestration/agent/handlers/JobPollingHandler.scala (1)
92-93
: Implement TODO before production use.The comment indicates missing handling for other cases.
What other cases need to be handled here? Consider adding specific examples in the TODO comment for future implementation.
## Summary Skeleton implementation for Agent service with job polling and status reporting happening at regular intervals which can be configured. A lot of place holder code which is going to change a lot in coming PR's so didn't add tests for now, will add them in coming PR's ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced configuration management for agent components, including polling intervals and service endpoints. - Added abstractions and in-memory implementations for job execution and key-value storage services. - Implemented periodic job polling and status reporting handlers for orchestration agents. - Added Vert.x verticles to manage job polling and status reporting workflows. - **Improvements** - Enhanced job struct to use a detailed job info field. - Expanded job status types with an explicit "unknown" state and updated status codes. - **Dependency Updates** - Included additional Vert.x dependencies for URI template support. - Packaged resource files into the orchestration binary. - **Bug Fixes** - Updated job handling logic and tests to reflect changes in job ID access patterns. - **Tests** - Adjusted tests to accommodate the new job info structure. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Skeleton implementation for Agent service with job polling and status reporting happening at regular intervals which can be configured. A lot of place holder code which is going to change a lot in coming PR's so didn't add tests for now, will add them in coming PR's ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced configuration management for agent components, including polling intervals and service endpoints. - Added abstractions and in-memory implementations for job execution and key-value storage services. - Implemented periodic job polling and status reporting handlers for orchestration agents. - Added Vert.x verticles to manage job polling and status reporting workflows. - **Improvements** - Enhanced job struct to use a detailed job info field. - Expanded job status types with an explicit "unknown" state and updated status codes. - **Dependency Updates** - Included additional Vert.x dependencies for URI template support. - Packaged resource files into the orchestration binary. - **Bug Fixes** - Updated job handling logic and tests to reflect changes in job ID access patterns. - **Tests** - Adjusted tests to accommodate the new job info structure. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Skeleton implementation for Agent service with job polling and status reporting happening at regular intervals which can be configured. A lot of place holder code which is going to change a lot in coming PR's so didn't add tests for now, will add them in coming PR's ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced configuration management for agent components, including polling intervals and service endpoints. - Added abstractions and in-memory implementations for job execution and key-value storage services. - Implemented periodic job polling and status reporting handlers for orchestration agents. - Added Vert.x verticles to manage job polling and status reporting workflows. - **Improvements** - Enhanced job struct to use a detailed job info field. - Expanded job status types with an explicit "unknown" state and updated status codes. - **Dependency Updates** - Included additional Vert.x dependencies for URI template support. - Packaged resource files into the orchestration binary. - **Bug Fixes** - Updated job handling logic and tests to reflect changes in job ID access patterns. - **Tests** - Adjusted tests to accommodate the new job info structure. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Skeleton implementation for Agent service with job polling and status reporting happening at regular intervals which can be configured. A lot of place holder code which is going to change a lot in coming PR's so didn't add tests for now, will add them in coming PR's ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced configuration management for agent components, including polling intervals and service endpoints. - Added abstractions and in-memory implementations for job execution and key-value storage services. - Implemented periodic job polling and status reporting handlers for orchestration agents. - Added Vert.x verticles to manage job polling and status reporting workflows. - **Improvements** - Enhanced job struct to use a detailed job info field. - Expanded job status types with an explicit "unknown" state and updated status codes. - **Dependency Updates** - Included additional Vert.x dependencies for URI template support. - Packaged resource files into the orchestration binary. - **Bug Fixes** - Updated job handling logic and tests to reflect changes in job ID access patterns. - **Tests** - Adjusted tests to accommodate the new job info structure. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Skeleton implementation for Agent service with job polling and status reporting happening at regular intervals which can be configured. A lot of place holder code which is going to change a lot in coming PR's so didn't add tests for now, will add them in coming PR's ## Cheour clientslist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced configuration management for agent components, including polling intervals and service endpoints. - Added abstractions and in-memory implementations for job execution and key-value storage services. - Implemented periodic job polling and status reporting handlers for orchestration agents. - Added Vert.x verticles to manage job polling and status reporting workflows. - **Improvements** - Enhanced job struct to use a detailed job info field. - Expanded job status types with an explicit "unknown" state and updated status codes. - **Dependency Updates** - Included additional Vert.x dependencies for URI template support. - Paour clientsaged resource files into the orchestration binary. - **Bug Fixes** - Updated job handling logic and tests to reflect changes in job ID access patterns. - **Tests** - Adjusted tests to accommodate the new job info structure. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
Summary
Skeleton implementation for Agent service with job polling and status reporting happening at regular intervals which can be configured. A lot of place holder code which is going to change a lot in coming PR's so didn't add tests for now, will add them in coming PR's
Checklist
Summary by CodeRabbit
New Features
Improvements
Dependency Updates
Bug Fixes
Tests