-
Notifications
You must be signed in to change notification settings - Fork 0
Initial skeleton code for Agent service #651
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
30004c3
888b831
48bd7bc
7aea3f3
3c67236
589e155
d04ab00
5196a68
5e1492c
aefbd03
aca2b93
a212411
525d650
7ee665f
55e8a61
4bf8871
48cae6b
b446cd9
6ff0047
d380311
67c564c
3a327d8
7e1c366
818110c
b531902
215a883
d9b02ca
41ef316
01f78db
020c73d
b34865f
a6429e7
2cf48f1
0470a52
aeca4e9
49ccfeb
70ec3e2
5a6f18f
f12009f
9aa6fff
7090b59
ec24f3e
3b6c7bf
afe6df1
2d20445
a3472a8
817e724
a6ab0e7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
package ai.chronon.orchestration.agent | ||
|
||
/** Configuration for Agent components. | ||
* | ||
* TODO: To move these to a separate file which can be configured per client | ||
*/ | ||
object AgentConfig { | ||
|
||
val orchestrationServiceHostname: String = "localhost" | ||
|
||
val orchestrationServicePort: Int = 3903 | ||
|
||
val pollingIntervalMs: Long = 5 * 60 * 1000 | ||
|
||
val statusReportingIntervalMs: Long = 5 * 60 * 1000 | ||
|
||
val topicId: String = "test-topic" | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
package ai.chronon.orchestration.agent | ||
|
||
import ai.chronon.api.{Job, JobStatusType} | ||
|
||
/** TODO: To extend this further if needed and have implementations for both GCP and AWS. | ||
* Just a place holder will change a lot in coming PRs | ||
*/ | ||
trait JobExecutionService { | ||
|
||
def submitJob(job: Job): Unit | ||
|
||
def getJobStatus(jobId: String): JobStatusType | ||
} | ||
|
||
/** In-memory implementation of JobExecutionService for testing and development. | ||
*/ | ||
class InMemoryJobExecutionService extends JobExecutionService { | ||
private val jobStatuses = scala.collection.mutable.Map[String, JobStatusType]() | ||
|
||
override def submitJob(job: Job): Unit = { | ||
jobStatuses.put(job.jobInfo.getJobId, JobStatusType.RUNNING) | ||
} | ||
|
||
override def getJobStatus(jobId: String): JobStatusType = { | ||
jobStatuses.getOrElse(jobId, JobStatusType.UNKNOWN) | ||
} | ||
} | ||
|
||
object JobExecutionService { | ||
def createInMemory(): JobExecutionService = new InMemoryJobExecutionService() | ||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,73 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
package ai.chronon.orchestration.agent | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import ai.chronon.api.{Job, JobStatusType} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/** Trait representing a key-value store service. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* This service provides abstraction for key-value storage operations, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* allowing the application to work with different storage implementations. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* TODO: To use our chronon KVStore, extend if needed and have implementations for both GCP and AWS | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* Just a place holder will change a lot in coming PRs | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
trait KVStore { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we instead call this AgentStore or something more specific than KV store? The interface isn't generic enough to be a kv store (unlike the KVStore trait we have in api which is very generic..) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, makes sense. Addressed this in the later PR #668 |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/** Stores a job with the given ID. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* @param jobId The unique identifier for the job | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* @param job The job object to store | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def storeJob(jobId: String, job: Job): Unit | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+19
to
+20
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Verification agent ❓ Verification inconclusiveAdd validation for inputs. No checks for null or empty jobId. /** Stores a job with the given ID.
*
* @param jobId The unique identifier for the job
* @param job The job object to store
*/
- def storeJob(jobId: String, job: Job): Unit
+ def storeJob(jobId: String, job: Job): Unit // Implementation example:
override def storeJob(jobId: String, job: Job): Unit = {
require(jobId != null && jobId.nonEmpty, "Job ID cannot be null or empty")
require(job != null, "Job cannot be null")
// ... rest of implementation
} Input validation required: override def storeJob(jobId: String, job: Job): Unit = {
require(jobId != null && jobId.nonEmpty, "Job ID cannot be null or empty")
require(job != null, "Job cannot be null")
// ... rest of implementation
} |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/** Retrieves a job with the given ID. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* @param jobId The unique identifier for the job | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* @return Option containing the job if found, None otherwise | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def getJob(jobId: String): Option[Job] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/** Retrieves all jobs stored in the system. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* @return List of all jobs | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def getAllJobs: List[Job] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/** Updates the status of a job with the given ID. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* @param jobId The unique identifier for the job | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* @param status The new status of the job | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def updateJobStatus(jobId: String, status: JobStatusType): Unit | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/** In-memory implementation of KVStore for testing and development. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
class InMemoryKVStore extends KVStore { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private val store = scala.collection.mutable.Map[String, Job]() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
override def storeJob(jobId: String, job: Job): Unit = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
store.put(jobId, job) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
override def getJob(jobId: String): Option[Job] = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
store.get(jobId) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
override def getAllJobs: List[Job] = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
store.values.toList | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
override def updateJobStatus(jobId: String, status: JobStatusType): Unit = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
store.get(jobId).foreach { job => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
job.jobInfo.setCurrentStatus(status) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+45
to
+65
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make InMemoryKVStore thread-safe. Current implementation uses mutable map without synchronization, potentially leading to race conditions. class InMemoryKVStore extends KVStore {
- private val store = scala.collection.mutable.Map[String, Job]()
+ private val store = new java.util.concurrent.ConcurrentHashMap[String, Job]()
override def storeJob(jobId: String, job: Job): Unit = {
- store.put(jobId, job)
+ store.put(jobId, job)
}
override def getJob(jobId: String): Option[Job] = {
- store.get(jobId)
+ Option(store.get(jobId))
}
override def getAllJobs: List[Job] = {
- store.values.toList
+ import scala.jdk.CollectionConverters._
+ store.values().asScala.toList
}
override def updateJobStatus(jobId: String, status: JobStatusType): Unit = {
- store.get(jobId).foreach { job =>
- job.jobInfo.setCurrentStatus(status)
+ Option(store.get(jobId)).foreach { job =>
+ // Create updated job with new status to avoid race conditions
+ val updatedJob = job.deepCopy()
+ updatedJob.jobInfo.setCurrentStatus(status)
+ store.put(jobId, updatedJob)
}
}
} 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
object KVStore { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/** Factory method to create an in-memory KVStore instance. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* @return An in-memory KVStore implementation | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def createInMemory(): KVStore = new InMemoryKVStore() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,109 @@ | ||||||||||||||||||||||||||||||||||||||||||
package ai.chronon.orchestration.agent.handlers | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
import ai.chronon.api.{Job, JobInfo} | ||||||||||||||||||||||||||||||||||||||||||
import ai.chronon.orchestration.agent.{AgentConfig, JobExecutionService, KVStore} | ||||||||||||||||||||||||||||||||||||||||||
import io.vertx.core.{AsyncResult, Handler} | ||||||||||||||||||||||||||||||||||||||||||
import io.vertx.core.buffer.Buffer | ||||||||||||||||||||||||||||||||||||||||||
import io.vertx.core.json.{JsonArray, JsonObject} | ||||||||||||||||||||||||||||||||||||||||||
import io.vertx.ext.web.client.{HttpResponse, WebClient} | ||||||||||||||||||||||||||||||||||||||||||
import org.slf4j.{Logger, LoggerFactory} | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
import scala.util.{Failure, Success, Try} | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
/** Handler for job polling operations. */ | ||||||||||||||||||||||||||||||||||||||||||
class JobPollingHandler( | ||||||||||||||||||||||||||||||||||||||||||
webClient: WebClient, | ||||||||||||||||||||||||||||||||||||||||||
kvStore: KVStore, | ||||||||||||||||||||||||||||||||||||||||||
jobExecutionService: JobExecutionService | ||||||||||||||||||||||||||||||||||||||||||
) extends Handler[java.lang.Long] { | ||||||||||||||||||||||||||||||||||||||||||
private val logger: Logger = LoggerFactory.getLogger(classOf[JobPollingHandler]) | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
/** Vert.x Handler implementation that gets called periodically. | ||||||||||||||||||||||||||||||||||||||||||
* | ||||||||||||||||||||||||||||||||||||||||||
* @param timerId The ID of the timer that triggered this handler | ||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||
override def handle(timerId: java.lang.Long): Unit = { | ||||||||||||||||||||||||||||||||||||||||||
pollForJobs() | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
/** Polls the orchestration service for new jobs. */ | ||||||||||||||||||||||||||||||||||||||||||
private def pollForJobs(): Unit = { | ||||||||||||||||||||||||||||||||||||||||||
logger.info(s"Polling for jobs with topicId: ${AgentConfig.topicId}") | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||||
val host = AgentConfig.orchestrationServiceHostname | ||||||||||||||||||||||||||||||||||||||||||
val port = AgentConfig.orchestrationServicePort | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
logger.info(s"Connecting to host:$host at port:$port") | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
// Make HTTP request to orchestration service | ||||||||||||||||||||||||||||||||||||||||||
webClient | ||||||||||||||||||||||||||||||||||||||||||
.get(port, host, "/jobs/list") | ||||||||||||||||||||||||||||||||||||||||||
.addQueryParam("topic_id", AgentConfig.topicId) | ||||||||||||||||||||||||||||||||||||||||||
.send(createJobPollingResponseHandler()) | ||||||||||||||||||||||||||||||||||||||||||
} catch { | ||||||||||||||||||||||||||||||||||||||||||
case e: Exception => | ||||||||||||||||||||||||||||||||||||||||||
logger.error("Error while polling for jobs", e) | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
/** Creates a response handler for job polling HTTP requests */ | ||||||||||||||||||||||||||||||||||||||||||
private def createJobPollingResponseHandler(): Handler[AsyncResult[HttpResponse[Buffer]]] = { | ||||||||||||||||||||||||||||||||||||||||||
(ar: AsyncResult[HttpResponse[Buffer]]) => | ||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||
if (ar.succeeded()) { | ||||||||||||||||||||||||||||||||||||||||||
val response = ar.result() | ||||||||||||||||||||||||||||||||||||||||||
if (response.statusCode() == 200) { | ||||||||||||||||||||||||||||||||||||||||||
val responseBody = response.bodyAsJsonObject() | ||||||||||||||||||||||||||||||||||||||||||
processJobsResponse(responseBody) | ||||||||||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||||||||||
logger.error(s"Failed to poll for jobs: ${response.statusCode()} ${response.statusMessage()}") | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||||||||||
logger.error("Failed to poll for jobs", ar.cause()) | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
/** Processes the response from the orchestration service containing jobs. */ | ||||||||||||||||||||||||||||||||||||||||||
private def processJobsResponse(responseJson: JsonObject): Unit = { | ||||||||||||||||||||||||||||||||||||||||||
Try { | ||||||||||||||||||||||||||||||||||||||||||
// Extract jobs to start | ||||||||||||||||||||||||||||||||||||||||||
val jobsToStartJson = responseJson.getJsonArray("jobsToStart", new JsonArray()) | ||||||||||||||||||||||||||||||||||||||||||
val jobsToStart = (0 until jobsToStartJson.size()).map { i => | ||||||||||||||||||||||||||||||||||||||||||
val jobJson = jobsToStartJson.getJsonObject(i) | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
// Create JobInfo | ||||||||||||||||||||||||||||||||||||||||||
val jobInfo = new JobInfo() | ||||||||||||||||||||||||||||||||||||||||||
jobInfo.setJobId(jobJson.getString("jobId")) | ||||||||||||||||||||||||||||||||||||||||||
// Set other job info properties as needed | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
// Create Job with JobInfo | ||||||||||||||||||||||||||||||||||||||||||
val job = new Job() | ||||||||||||||||||||||||||||||||||||||||||
job.setJobInfo(jobInfo) | ||||||||||||||||||||||||||||||||||||||||||
// Set other job properties as needed | ||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+76
to
+84
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Verification agent 🧩 Analysis chainIncomplete job creation implementation. Comments indicate missing properties. 🏁 Script executed: #!/bin/bash
# Verify full JobInfo and Job structure requirements
rg -A 5 "class JobInfo|class Job" --type scala Length of output: 6986 Action: Fully initialize Job and JobInfo. |
||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
job | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
logger.info(s"Received ${jobsToStart.size} jobs to start") | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
// Process each job | ||||||||||||||||||||||||||||||||||||||||||
jobsToStart.foreach { job => | ||||||||||||||||||||||||||||||||||||||||||
// TODO: To handle all other cases | ||||||||||||||||||||||||||||||||||||||||||
if (kvStore.getJob(job.jobInfo.getJobId).isEmpty) { | ||||||||||||||||||||||||||||||||||||||||||
// Store job in KV store | ||||||||||||||||||||||||||||||||||||||||||
kvStore.storeJob(job.jobInfo.getJobId, job) | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
// Submit job to cluster | ||||||||||||||||||||||||||||||||||||||||||
jobExecutionService.submitJob(job) | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+94
to
+100
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Job submission lacks error handling. Add try-catch around store and submit operations. - // Store job in KV store
- kvStore.storeJob(job.jobInfo.getJobId, job)
-
- // Submit job to cluster
- jobExecutionService.submitJob(job)
+ try {
+ // Store job in KV store
+ kvStore.storeJob(job.jobInfo.getJobId, job)
+
+ // Submit job to cluster
+ jobExecutionService.submitJob(job)
+ logger.debug(s"Successfully submitted job ${job.jobInfo.getJobId}")
+ } catch {
+ case e: Exception =>
+ logger.error(s"Failed to store/submit job ${job.jobInfo.getJobId}", e)
+ } 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
} match { | ||||||||||||||||||||||||||||||||||||||||||
case Success(_) => | ||||||||||||||||||||||||||||||||||||||||||
logger.info("Successfully processed jobs response") | ||||||||||||||||||||||||||||||||||||||||||
case Failure(e) => | ||||||||||||||||||||||||||||||||||||||||||
logger.error("Error processing jobs response", e) | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,95 @@ | ||||||||||||||||||||||||||
package ai.chronon.orchestration.agent.handlers | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
import ai.chronon.api.JobStatusType | ||||||||||||||||||||||||||
import ai.chronon.orchestration.agent.{AgentConfig, JobExecutionService, KVStore} | ||||||||||||||||||||||||||
import io.vertx.core.{AsyncResult, Handler} | ||||||||||||||||||||||||||
import io.vertx.core.buffer.Buffer | ||||||||||||||||||||||||||
import io.vertx.core.json.JsonObject | ||||||||||||||||||||||||||
import io.vertx.ext.web.client.{HttpResponse, WebClient} | ||||||||||||||||||||||||||
import org.slf4j.{Logger, LoggerFactory} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
import scala.util.{Failure, Success, Try} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
/** Handler for status reporting operations. */ | ||||||||||||||||||||||||||
class StatusReportingHandler( | ||||||||||||||||||||||||||
webClient: WebClient, | ||||||||||||||||||||||||||
kvStore: KVStore, | ||||||||||||||||||||||||||
jobExecutionService: JobExecutionService | ||||||||||||||||||||||||||
) extends Handler[java.lang.Long] { | ||||||||||||||||||||||||||
private val logger: Logger = LoggerFactory.getLogger(classOf[StatusReportingHandler]) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
/** Vert.x Handler implementation that gets called periodically. | ||||||||||||||||||||||||||
* | ||||||||||||||||||||||||||
* @param timerId The ID of the timer that triggered this handler | ||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||
override def handle(timerId: java.lang.Long): Unit = { | ||||||||||||||||||||||||||
reportJobStatuses() | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
/** Checks the status of all jobs and reports changes. */ | ||||||||||||||||||||||||||
private def reportJobStatuses(): Unit = { | ||||||||||||||||||||||||||
logger.debug("Checking for job status updates") | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||
// Get all jobs from KV store | ||||||||||||||||||||||||||
val jobs = kvStore.getAllJobs | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
// For each job, check current status and report if changed | ||||||||||||||||||||||||||
jobs.foreach { job => | ||||||||||||||||||||||||||
val jobId = job.jobInfo.getJobId | ||||||||||||||||||||||||||
val currentStatus = job.jobInfo.getCurrentStatus | ||||||||||||||||||||||||||
val latestStatus = jobExecutionService.getJobStatus(jobId) | ||||||||||||||||||||||||||
Comment on lines
+39
to
+41
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing null checks for job.jobInfo. Add null checks to prevent NPEs. jobs.foreach { job =>
- val jobId = job.jobInfo.getJobId
- val currentStatus = job.jobInfo.getCurrentStatus
+ if (job.jobInfo == null) {
+ logger.warn("Found job with null jobInfo, skipping")
+ return
+ }
+ val jobId = job.jobInfo.getJobId
+ val currentStatus = job.jobInfo.getCurrentStatus 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
if (currentStatus != latestStatus) { | ||||||||||||||||||||||||||
logger.info(s"Job $jobId status changed from $currentStatus to $latestStatus") | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
// Update status in KV store | ||||||||||||||||||||||||||
kvStore.updateJobStatus(jobId, latestStatus) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
// Report status change to orchestration service | ||||||||||||||||||||||||||
reportStatusToOrchestration(jobId, latestStatus) | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
} catch { | ||||||||||||||||||||||||||
case e: Exception => | ||||||||||||||||||||||||||
logger.error("Error while reporting job statuses", e) | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
/** Reports a job status update to the orchestration service. */ | ||||||||||||||||||||||||||
private def reportStatusToOrchestration(jobId: String, status: JobStatusType): Unit = { | ||||||||||||||||||||||||||
// Create status update payload | ||||||||||||||||||||||||||
val statusUpdate = new JsonObject() | ||||||||||||||||||||||||||
.put("jobId", jobId) | ||||||||||||||||||||||||||
.put("status", status.toString) | ||||||||||||||||||||||||||
.put("timestamp", System.currentTimeMillis()) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
val host = AgentConfig.orchestrationServiceHostname | ||||||||||||||||||||||||||
val port = AgentConfig.orchestrationServicePort | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
logger.info(s"Connecting to host:$host at port:$port") | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
// Send status update to orchestration service | ||||||||||||||||||||||||||
webClient | ||||||||||||||||||||||||||
.put(port, host, s"/jobs/$jobId/status") | ||||||||||||||||||||||||||
.sendJsonObject(statusUpdate, createStatusUpdateHandler(jobId, status)) | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
/** Creates a response handler for status update HTTP requests */ | ||||||||||||||||||||||||||
private def createStatusUpdateHandler( | ||||||||||||||||||||||||||
jobId: String, | ||||||||||||||||||||||||||
status: JobStatusType): Handler[AsyncResult[HttpResponse[Buffer]]] = { (ar: AsyncResult[HttpResponse[Buffer]]) => | ||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||
if (ar.succeeded()) { | ||||||||||||||||||||||||||
val response = ar.result() | ||||||||||||||||||||||||||
if (response.statusCode() == 200) { | ||||||||||||||||||||||||||
logger.info(s"Successfully reported status update for job $jobId: $status") | ||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||
logger.error(s"Failed to report status update: ${response.statusCode()} ${response.statusMessage()}") | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||
logger.error("Failed to report status update", ar.cause()) | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be nice if we could inject that KV store here (if we know the apis match what we need)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed this in the later PR #668