Skip to content

Initial skeleton code for Agent service #651

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 48 commits into from
Apr 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
30004c3
Initial working version with integration tests
kumar-zlai Mar 24, 2025
888b831
Additional refactoring and fixed the full dag spec unit test
kumar-zlai Mar 25, 2025
48bd7bc
Refactored PubSubClient implementation into different components with…
kumar-zlai Mar 26, 2025
7aea3f3
Updated error handling and some future todos
kumar-zlai Mar 26, 2025
3c67236
Minor changes to bump up the gax dependency version
kumar-zlai Mar 26, 2025
589e155
Initial working version after refactoring the generic traits to not h…
kumar-zlai Mar 26, 2025
d04ab00
Refactoring of generic traits and gcp specific implementations complete
kumar-zlai Mar 26, 2025
5196a68
Minor scalafmt fixes
kumar-zlai Mar 26, 2025
5e1492c
Fixed gcloud auth issues using prod config in unit tests
kumar-zlai Mar 27, 2025
aefbd03
Minor change to fix compilation errors in 2.13 build
kumar-zlai Mar 27, 2025
aca2b93
Integrated nodeDao for pulling dependencies and removed DummyNode ref…
kumar-zlai Mar 28, 2025
a212411
Initial partially working version with missingRanges psuedo code
kumar-zlai Apr 1, 2025
525d650
Partial commit after modifying activity function signatures
kumar-zlai Apr 1, 2025
7ee665f
Initial working version with activity function signatures refactoring…
kumar-zlai Apr 2, 2025
55e8a61
Bug fix to wait on specific workflow run while executing missing Node…
kumar-zlai Apr 2, 2025
4bf8871
Added documentation for temporal/persistence layer logic
kumar-zlai Apr 2, 2025
48cae6b
Initial working version with integration tests
kumar-zlai Mar 24, 2025
b446cd9
Additional refactoring and fixed the full dag spec unit test
kumar-zlai Mar 25, 2025
6ff0047
Refactored PubSubClient implementation into different components with…
kumar-zlai Mar 26, 2025
d380311
Updated error handling and some future todos
kumar-zlai Mar 26, 2025
67c564c
Minor changes to bump up the gax dependency version
kumar-zlai Mar 26, 2025
3a327d8
Initial working version after refactoring the generic traits to not h…
kumar-zlai Mar 26, 2025
7e1c366
Refactoring of generic traits and gcp specific implementations complete
kumar-zlai Mar 26, 2025
818110c
Minor scalafmt fixes
kumar-zlai Mar 26, 2025
b531902
Fixed gcloud auth issues using prod config in unit tests
kumar-zlai Mar 27, 2025
215a883
Minor change to fix compilation errors in 2.13 build
kumar-zlai Mar 27, 2025
d9b02ca
Merge branch 'pubsub_poc' into temporal_persistence_layer_integration
kumar-zlai Apr 2, 2025
41ef316
Initial working logic for missing steps and some workflow refactoring
kumar-zlai Apr 4, 2025
01f78db
Merge branch 'main' into temporal_persistence_layer_integration
kumar-zlai Apr 4, 2025
020c73d
Merge branch 'missing_steps_logic' into temporal_persistence_layer_in…
kumar-zlai Apr 4, 2025
b34865f
save
kumar-zlai Apr 4, 2025
a6429e7
Dependency resolver logic refactor to api module with unit tests
kumar-zlai Apr 7, 2025
2cf48f1
Merge branch 'kumar/dependency_resolver_refactor' into temporal_persi…
kumar-zlai Apr 7, 2025
0470a52
Refactored and cleaned up missing steps logic
kumar-zlai Apr 7, 2025
aeca4e9
Changes to persist node table dependencies used for determining parti…
kumar-zlai Apr 8, 2025
49ccfeb
Merge branch 'main' into temporal_persistence_layer_integration
kumar-zlai Apr 8, 2025
70ec3e2
Removed auto-generated thrift files from gitignore conflict
kumar-zlai Apr 8, 2025
5a6f18f
Minor fix for resolving compilation errors due to moving DependencyRe…
kumar-zlai Apr 8, 2025
f12009f
Initial working version for job requests dispatcher endpoint with ref…
kumar-zlai Apr 10, 2025
9aa6fff
Minor changes to documentation
kumar-zlai Apr 10, 2025
7090b59
Minor changes to documentation
kumar-zlai Apr 10, 2025
ec24f3e
Added new endpoint for uploading conf diff
kumar-zlai Apr 10, 2025
3b6c7bf
[save] partial commit needs more changes
kumar-zlai Apr 14, 2025
afe6df1
Merge branch 'main' into kumar/agent_service
kumar-zlai Apr 15, 2025
2d20445
Initial working version with basic setup
kumar-zlai Apr 16, 2025
a3472a8
Minor fix to pass query params
kumar-zlai Apr 16, 2025
817e724
Addressed PR comments
kumar-zlai Apr 18, 2025
a6ab0e7
Merge branch 'main' into kumar/agent_service
kumar-zlai Apr 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions api/thrift/agent.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ union JobBase {
}

struct Job {
1: optional string jobId
1: optional JobInfo jobInfo
2: optional JobBase jobUnion
3: optional i32 statusReportInterval
4: optional i32 maxRetries
Expand All @@ -93,11 +93,12 @@ struct JobListResponse {
}

enum JobStatusType {
PENDING = 0,
RUNNING = 1,
SUCCEEDED = 2,
FAILED = 3,
STOPPED = 4
UNKNOWN = 0,
PENDING = 1,
RUNNING = 2,
SUCCEEDED = 3,
FAILED = 4,
STOPPED = 5
}

struct ResourceUsage {
Expand Down
1 change: 1 addition & 0 deletions orchestration/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -112,5 +112,6 @@ scala_test_suite(
jvm_binary(
name = "orchestration_assembly",
main_class = "ai.chronon.service.ChrononServiceLauncher",
resources = glob(["src/main/resources/**/*"]),
runtime_deps = [":lib"],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package ai.chronon.orchestration.agent

/** Configuration for Agent components.
*
* TODO: To move these to a separate file which can be configured per client
*/
object AgentConfig {

val orchestrationServiceHostname: String = "localhost"

val orchestrationServicePort: Int = 3903

val pollingIntervalMs: Long = 5 * 60 * 1000

val statusReportingIntervalMs: Long = 5 * 60 * 1000

val topicId: String = "test-topic"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package ai.chronon.orchestration.agent

import ai.chronon.api.{Job, JobStatusType}

/** TODO: To extend this further if needed and have implementations for both GCP and AWS.
* Just a place holder will change a lot in coming PRs
*/
trait JobExecutionService {

def submitJob(job: Job): Unit

def getJobStatus(jobId: String): JobStatusType
}

/** In-memory implementation of JobExecutionService for testing and development.
*/
class InMemoryJobExecutionService extends JobExecutionService {
private val jobStatuses = scala.collection.mutable.Map[String, JobStatusType]()

override def submitJob(job: Job): Unit = {
jobStatuses.put(job.jobInfo.getJobId, JobStatusType.RUNNING)
}

override def getJobStatus(jobId: String): JobStatusType = {
jobStatuses.getOrElse(jobId, JobStatusType.UNKNOWN)
}
}

object JobExecutionService {
def createInMemory(): JobExecutionService = new InMemoryJobExecutionService()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package ai.chronon.orchestration.agent

import ai.chronon.api.{Job, JobStatusType}

/** Trait representing a key-value store service.
*
* This service provides abstraction for key-value storage operations,
* allowing the application to work with different storage implementations.
*
* TODO: To use our chronon KVStore, extend if needed and have implementations for both GCP and AWS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice if we could inject that KV store here (if we know the apis match what we need)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed this in the later PR #668

* Just a place holder will change a lot in coming PRs
*/
trait KVStore {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we instead call this AgentStore or something more specific than KV store? The interface isn't generic enough to be a kv store (unlike the KVStore trait we have in api which is very generic..)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, makes sense. Addressed this in the later PR #668


/** Stores a job with the given ID.
*
* @param jobId The unique identifier for the job
* @param job The job object to store
*/
def storeJob(jobId: String, job: Job): Unit
Comment on lines +19 to +20
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

❓ Verification inconclusive

Add validation for inputs.

No checks for null or empty jobId.

   /** Stores a job with the given ID.
     *
     * @param jobId The unique identifier for the job
     * @param job The job object to store
     */
-  def storeJob(jobId: String, job: Job): Unit
+  def storeJob(jobId: String, job: Job): Unit
// Implementation example:
override def storeJob(jobId: String, job: Job): Unit = {
  require(jobId != null && jobId.nonEmpty, "Job ID cannot be null or empty")
  require(job != null, "Job cannot be null")
  // ... rest of implementation
}

Input validation required:
Please add inline validations to ensure that jobId is neither null nor empty and that job is not null. For example:

override def storeJob(jobId: String, job: Job): Unit = {
  require(jobId != null && jobId.nonEmpty, "Job ID cannot be null or empty")
  require(job != null, "Job cannot be null")
  // ... rest of implementation
}


/** Retrieves a job with the given ID.
*
* @param jobId The unique identifier for the job
* @return Option containing the job if found, None otherwise
*/
def getJob(jobId: String): Option[Job]

/** Retrieves all jobs stored in the system.
*
* @return List of all jobs
*/
def getAllJobs: List[Job]

/** Updates the status of a job with the given ID.
*
* @param jobId The unique identifier for the job
* @param status The new status of the job
*/
def updateJobStatus(jobId: String, status: JobStatusType): Unit
}

/** In-memory implementation of KVStore for testing and development.
*/
class InMemoryKVStore extends KVStore {
private val store = scala.collection.mutable.Map[String, Job]()

override def storeJob(jobId: String, job: Job): Unit = {
store.put(jobId, job)
}

override def getJob(jobId: String): Option[Job] = {
store.get(jobId)
}

override def getAllJobs: List[Job] = {
store.values.toList
}

override def updateJobStatus(jobId: String, status: JobStatusType): Unit = {
store.get(jobId).foreach { job =>
job.jobInfo.setCurrentStatus(status)
}
}
}
Comment on lines +45 to +65
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Make InMemoryKVStore thread-safe.

Current implementation uses mutable map without synchronization, potentially leading to race conditions.

 class InMemoryKVStore extends KVStore {
-  private val store = scala.collection.mutable.Map[String, Job]()
+  private val store = new java.util.concurrent.ConcurrentHashMap[String, Job]()

   override def storeJob(jobId: String, job: Job): Unit = {
-    store.put(jobId, job)
+    store.put(jobId, job)
   }

   override def getJob(jobId: String): Option[Job] = {
-    store.get(jobId)
+    Option(store.get(jobId))
   }

   override def getAllJobs: List[Job] = {
-    store.values.toList
+    import scala.jdk.CollectionConverters._
+    store.values().asScala.toList
   }

   override def updateJobStatus(jobId: String, status: JobStatusType): Unit = {
-    store.get(jobId).foreach { job =>
-      job.jobInfo.setCurrentStatus(status)
+    Option(store.get(jobId)).foreach { job =>
+      // Create updated job with new status to avoid race conditions
+      val updatedJob = job.deepCopy()
+      updatedJob.jobInfo.setCurrentStatus(status)
+      store.put(jobId, updatedJob)
     }
   }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
class InMemoryKVStore extends KVStore {
private val store = scala.collection.mutable.Map[String, Job]()
override def storeJob(jobId: String, job: Job): Unit = {
store.put(jobId, job)
}
override def getJob(jobId: String): Option[Job] = {
store.get(jobId)
}
override def getAllJobs: List[Job] = {
store.values.toList
}
override def updateJobStatus(jobId: String, status: JobStatusType): Unit = {
store.get(jobId).foreach { job =>
job.jobInfo.setCurrentStatus(status)
}
}
}
class InMemoryKVStore extends KVStore {
private val store = new java.util.concurrent.ConcurrentHashMap[String, Job]()
override def storeJob(jobId: String, job: Job): Unit = {
store.put(jobId, job)
}
override def getJob(jobId: String): Option[Job] = {
Option(store.get(jobId))
}
override def getAllJobs: List[Job] = {
import scala.jdk.CollectionConverters._
store.values().asScala.toList
}
override def updateJobStatus(jobId: String, status: JobStatusType): Unit = {
Option(store.get(jobId)).foreach { job =>
// Create updated job with new status to avoid race conditions
val updatedJob = job.deepCopy()
updatedJob.jobInfo.setCurrentStatus(status)
store.put(jobId, updatedJob)
}
}
}


object KVStore {

/** Factory method to create an in-memory KVStore instance.
* @return An in-memory KVStore implementation
*/
def createInMemory(): KVStore = new InMemoryKVStore()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package ai.chronon.orchestration.agent.handlers

import ai.chronon.api.{Job, JobInfo}
import ai.chronon.orchestration.agent.{AgentConfig, JobExecutionService, KVStore}
import io.vertx.core.{AsyncResult, Handler}
import io.vertx.core.buffer.Buffer
import io.vertx.core.json.{JsonArray, JsonObject}
import io.vertx.ext.web.client.{HttpResponse, WebClient}
import org.slf4j.{Logger, LoggerFactory}

import scala.util.{Failure, Success, Try}

/** Handler for job polling operations. */
class JobPollingHandler(
webClient: WebClient,
kvStore: KVStore,
jobExecutionService: JobExecutionService
) extends Handler[java.lang.Long] {
private val logger: Logger = LoggerFactory.getLogger(classOf[JobPollingHandler])

/** Vert.x Handler implementation that gets called periodically.
*
* @param timerId The ID of the timer that triggered this handler
*/
override def handle(timerId: java.lang.Long): Unit = {
pollForJobs()
}

/** Polls the orchestration service for new jobs. */
private def pollForJobs(): Unit = {
logger.info(s"Polling for jobs with topicId: ${AgentConfig.topicId}")

try {
val host = AgentConfig.orchestrationServiceHostname
val port = AgentConfig.orchestrationServicePort

logger.info(s"Connecting to host:$host at port:$port")

// Make HTTP request to orchestration service
webClient
.get(port, host, "/jobs/list")
.addQueryParam("topic_id", AgentConfig.topicId)
.send(createJobPollingResponseHandler())
} catch {
case e: Exception =>
logger.error("Error while polling for jobs", e)
}
}

/** Creates a response handler for job polling HTTP requests */
private def createJobPollingResponseHandler(): Handler[AsyncResult[HttpResponse[Buffer]]] = {
(ar: AsyncResult[HttpResponse[Buffer]]) =>
{
if (ar.succeeded()) {
val response = ar.result()
if (response.statusCode() == 200) {
val responseBody = response.bodyAsJsonObject()
processJobsResponse(responseBody)
} else {
logger.error(s"Failed to poll for jobs: ${response.statusCode()} ${response.statusMessage()}")
}
} else {
logger.error("Failed to poll for jobs", ar.cause())
}
}
}

/** Processes the response from the orchestration service containing jobs. */
private def processJobsResponse(responseJson: JsonObject): Unit = {
Try {
// Extract jobs to start
val jobsToStartJson = responseJson.getJsonArray("jobsToStart", new JsonArray())
val jobsToStart = (0 until jobsToStartJson.size()).map { i =>
val jobJson = jobsToStartJson.getJsonObject(i)

// Create JobInfo
val jobInfo = new JobInfo()
jobInfo.setJobId(jobJson.getString("jobId"))
// Set other job info properties as needed

// Create Job with JobInfo
val job = new Job()
job.setJobInfo(jobInfo)
// Set other job properties as needed
Comment on lines +76 to +84
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Incomplete job creation implementation.

Comments indicate missing properties.


🏁 Script executed:

#!/bin/bash
# Verify full JobInfo and Job structure requirements
rg -A 5 "class JobInfo|class Job" --type scala

Length of output: 6986


Action: Fully initialize Job and JobInfo.
The current implementation only sets the jobId, with placeholder comments for other properties. Please update the code based on the complete specifications of JobInfo and Job to ensure that all mandatory fields are appropriately initialized.


job
}

logger.info(s"Received ${jobsToStart.size} jobs to start")

// Process each job
jobsToStart.foreach { job =>
// TODO: To handle all other cases
if (kvStore.getJob(job.jobInfo.getJobId).isEmpty) {
// Store job in KV store
kvStore.storeJob(job.jobInfo.getJobId, job)

// Submit job to cluster
jobExecutionService.submitJob(job)
}
Comment on lines +94 to +100
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Job submission lacks error handling.

Add try-catch around store and submit operations.

- // Store job in KV store
- kvStore.storeJob(job.jobInfo.getJobId, job)
-
- // Submit job to cluster
- jobExecutionService.submitJob(job)
+ try {
+   // Store job in KV store
+   kvStore.storeJob(job.jobInfo.getJobId, job)
+
+   // Submit job to cluster
+   jobExecutionService.submitJob(job)
+   logger.debug(s"Successfully submitted job ${job.jobInfo.getJobId}")
+ } catch {
+   case e: Exception =>
+     logger.error(s"Failed to store/submit job ${job.jobInfo.getJobId}", e)
+ }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (kvStore.getJob(job.jobInfo.getJobId).isEmpty) {
// Store job in KV store
kvStore.storeJob(job.jobInfo.getJobId, job)
// Submit job to cluster
jobExecutionService.submitJob(job)
}
if (kvStore.getJob(job.jobInfo.getJobId).isEmpty) {
try {
// Store job in KV store
kvStore.storeJob(job.jobInfo.getJobId, job)
// Submit job to cluster
jobExecutionService.submitJob(job)
logger.debug(s"Successfully submitted job ${job.jobInfo.getJobId}")
} catch {
case e: Exception =>
logger.error(s"Failed to store/submit job ${job.jobInfo.getJobId}", e)
}
}

}
} match {
case Success(_) =>
logger.info("Successfully processed jobs response")
case Failure(e) =>
logger.error("Error processing jobs response", e)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package ai.chronon.orchestration.agent.handlers

import ai.chronon.api.JobStatusType
import ai.chronon.orchestration.agent.{AgentConfig, JobExecutionService, KVStore}
import io.vertx.core.{AsyncResult, Handler}
import io.vertx.core.buffer.Buffer
import io.vertx.core.json.JsonObject
import io.vertx.ext.web.client.{HttpResponse, WebClient}
import org.slf4j.{Logger, LoggerFactory}

import scala.util.{Failure, Success, Try}

/** Handler for status reporting operations. */
class StatusReportingHandler(
webClient: WebClient,
kvStore: KVStore,
jobExecutionService: JobExecutionService
) extends Handler[java.lang.Long] {
private val logger: Logger = LoggerFactory.getLogger(classOf[StatusReportingHandler])

/** Vert.x Handler implementation that gets called periodically.
*
* @param timerId The ID of the timer that triggered this handler
*/
override def handle(timerId: java.lang.Long): Unit = {
reportJobStatuses()
}

/** Checks the status of all jobs and reports changes. */
private def reportJobStatuses(): Unit = {
logger.debug("Checking for job status updates")

try {
// Get all jobs from KV store
val jobs = kvStore.getAllJobs

// For each job, check current status and report if changed
jobs.foreach { job =>
val jobId = job.jobInfo.getJobId
val currentStatus = job.jobInfo.getCurrentStatus
val latestStatus = jobExecutionService.getJobStatus(jobId)
Comment on lines +39 to +41
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Missing null checks for job.jobInfo.

Add null checks to prevent NPEs.

jobs.foreach { job =>
-  val jobId = job.jobInfo.getJobId
-  val currentStatus = job.jobInfo.getCurrentStatus
+  if (job.jobInfo == null) {
+    logger.warn("Found job with null jobInfo, skipping")
+    return
+  }
+  val jobId = job.jobInfo.getJobId
+  val currentStatus = job.jobInfo.getCurrentStatus
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
val jobId = job.jobInfo.getJobId
val currentStatus = job.jobInfo.getCurrentStatus
val latestStatus = jobExecutionService.getJobStatus(jobId)
jobs.foreach { job =>
if (job.jobInfo == null) {
logger.warn("Found job with null jobInfo, skipping")
return
}
val jobId = job.jobInfo.getJobId
val currentStatus = job.jobInfo.getCurrentStatus
val latestStatus = jobExecutionService.getJobStatus(jobId)
}


if (currentStatus != latestStatus) {
logger.info(s"Job $jobId status changed from $currentStatus to $latestStatus")

// Update status in KV store
kvStore.updateJobStatus(jobId, latestStatus)

// Report status change to orchestration service
reportStatusToOrchestration(jobId, latestStatus)
}
}
} catch {
case e: Exception =>
logger.error("Error while reporting job statuses", e)
}
}

/** Reports a job status update to the orchestration service. */
private def reportStatusToOrchestration(jobId: String, status: JobStatusType): Unit = {
// Create status update payload
val statusUpdate = new JsonObject()
.put("jobId", jobId)
.put("status", status.toString)
.put("timestamp", System.currentTimeMillis())

val host = AgentConfig.orchestrationServiceHostname
val port = AgentConfig.orchestrationServicePort

logger.info(s"Connecting to host:$host at port:$port")

// Send status update to orchestration service
webClient
.put(port, host, s"/jobs/$jobId/status")
.sendJsonObject(statusUpdate, createStatusUpdateHandler(jobId, status))
}

/** Creates a response handler for status update HTTP requests */
private def createStatusUpdateHandler(
jobId: String,
status: JobStatusType): Handler[AsyncResult[HttpResponse[Buffer]]] = { (ar: AsyncResult[HttpResponse[Buffer]]) =>
{
if (ar.succeeded()) {
val response = ar.result()
if (response.statusCode() == 200) {
logger.info(s"Successfully reported status update for job $jobId: $status")
} else {
logger.error(s"Failed to report status update: ${response.statusCode()} ${response.statusMessage()}")
}
} else {
logger.error("Failed to report status update", ar.cause())
}
}
}
}
Loading