Skip to content

Initial skeleton code for Agent service #651

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 48 commits into from
Apr 18, 2025
Merged

Conversation

kumar-zlai
Copy link
Contributor

@kumar-zlai kumar-zlai commented Apr 16, 2025

Summary

Skeleton implementation for Agent service with job polling and status reporting happening at regular intervals which can be configured. A lot of place holder code which is going to change a lot in coming PR's so didn't add tests for now, will add them in coming PR's

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Summary by CodeRabbit

  • New Features

    • Introduced configuration management for agent components, including polling intervals and service endpoints.
    • Added abstractions and in-memory implementations for job execution and key-value storage services.
    • Implemented periodic job polling and status reporting handlers for orchestration agents.
    • Added Vert.x verticles to manage job polling and status reporting workflows.
  • Improvements

    • Enhanced job struct to use a detailed job info field.
    • Expanded job status types with an explicit "unknown" state and updated status codes.
  • Dependency Updates

    • Included additional Vert.x dependencies for URI template support.
    • Packaged resource files into the orchestration binary.
  • Bug Fixes

    • Updated job handling logic and tests to reflect changes in job ID access patterns.
  • Tests

    • Adjusted tests to accommodate the new job info structure.

… single responsibility for better maintainance with unit/integration tests
… single responsibility for better maintainance with unit/integration tests
Copy link

coderabbitai bot commented Apr 16, 2025

Walkthrough

This update refactors the job orchestration system by introducing new Scala components for job polling, status reporting, and in-memory job execution and storage. The Thrift API changes replace the jobId string with a JobInfo struct and add an explicit UNKNOWN status to the job status enum, shifting existing values. Build files are updated to include resource files and a new Vert.x dependency. Handler and test code are adapted to the new job structure and access patterns. Vert.x verticles manage periodic polling and reporting with dependency injection and lifecycle control.

Changes

File(s) Change Summary
api/thrift/agent.thrift Replaced jobId string in Job struct with optional jobInfo of type JobInfo. Added UNKNOWN=0 to JobStatusType enum; shifted existing statuses by one.
orchestration/BUILD.bazel Added resources = glob(["src/main/resources/**/*"]) to orchestration_assembly JVM binary target.
orchestration/src/main/scala/ai/chronon/orchestration/agent/AgentConfig.scala Added AgentConfig object with constants for orchestration host, port, polling and reporting intervals, and PubSub topic ID.
orchestration/src/main/scala/ai/chronon/orchestration/agent/JobExecutionService.scala Added JobExecutionService trait and InMemoryJobExecutionService class with job submission and status retrieval methods; includes factory method.
orchestration/src/main/scala/ai/chronon/orchestration/agent/KVStore.scala Added KVStore trait and InMemoryKVStore class for job storage and status updates; includes factory method.
orchestration/src/main/scala/ai/chronon/orchestration/agent/handlers/JobPollingHandler.scala Added JobPollingHandler class implementing Vert.x Handler to poll jobs from orchestration service, store new jobs, and submit them for execution; includes error handling and logging.
orchestration/src/main/scala/ai/chronon/orchestration/agent/handlers/StatusReportingHandler.scala Added StatusReportingHandler class implementing Vert.x Handler to check job status changes and report updates asynchronously to orchestration service; includes error handling and logging.
orchestration/src/main/scala/ai/chronon/orchestration/agent/verticle/JobPollingVerticle.scala Added JobPollingVerticle class managing periodic job polling lifecycle, initialization of dependencies, and clean shutdown; includes companion object factory method for dependency injection.
orchestration/src/main/scala/ai/chronon/orchestration/agent/verticle/StatusReportingVerticle.scala Added StatusReportingVerticle class managing periodic job status reporting lifecycle with dependency injection and clean shutdown; includes companion object factory method.
orchestration/src/main/scala/ai/chronon/orchestration/service/handlers/JobsHandler.scala Updated to import JobInfo and create Job instances with nested jobInfo field instead of direct jobId.
orchestration/src/test/scala/ai/chronon/orchestration/test/service/handlers/JobsHandlerSpec.scala Updated test assertions to access job IDs via nested jobInfo property reflecting struct changes.
tools/build_rules/prelude_bazel Added io.vertx:vertx-uri-template Maven artifact to Vert.x core and test dependencies.

Sequence Diagram(s)

sequenceDiagram
  participant Agent as Agent Verticle
  participant Orchestration as Orchestration Service
  participant KV as KVStore
  participant Exec as JobExecutionService

  Agent->>Orchestration: GET /jobs/list?topic_id=...
  Orchestration-->>Agent: JSON list of jobs
  Agent->>KV: Check if job exists
  alt Job not in KV
    Agent->>KV: Store job
    Agent->>Exec: submitJob(job)
  end
Loading
sequenceDiagram
  participant Agent as StatusReportingVerticle
  participant KV as KVStore
  participant Exec as JobExecutionService
  participant Orchestration as Orchestration Service

  Agent->>KV: getAllJobs
  loop For each job
    Agent->>Exec: getJobStatus(jobId)
    alt Status changed
      Agent->>KV: updateJobStatus(jobId, newStatus)
      Agent->>Orchestration: PUT /jobs/status (jobId, newStatus, timestamp)
      Orchestration-->>Agent: 200 OK or error
    end
  end
Loading

Suggested reviewers

  • piyush-zlai
  • nikhil-zlai

Poem

🚦
New jobs now poll, and statuses report,
With in-memory stores to safely transport.
Handlers and verticles, all in a spin,
Awaiting new tasks, let the orchestration begin!
Thrift structs refashioned, enums realigned—
This code’s a conductor, with jobs well-timed.
🎶

Warning

Review ran into problems

🔥 Problems

GitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository.

Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings.


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

🧹 Nitpick comments (10)
orchestration/src/main/scala/ai/chronon/orchestration/agent/verticle/StatusReportingVerticle.scala (2)

82-93: Add immediate initial check.

Status reporting only occurs at intervals, without immediate execution.

   private def startStatusReporting(startPromise: Promise[Void]): Unit = {
     if (isReporting.compareAndSet(false, true)) {
       logger.info("Starting status reporting")
 
+      // Run initial status check immediately
+      statusReportingHandler.handle(0L)
+
       // Set up periodic status reporting with handler
       reportingTimerId = vertx.setPeriodic(AgentConfig.statusReportingIntervalMs, statusReportingHandler)
 
       startPromise.complete()
     } else {
       startPromise.complete()
     }
   }

118-126: Allow WebClient injection for testing.

The factory method supports KVStore and JobExecutionService injection, but not WebClient.

   def createWithDependencies(
       kvStore: KVStore,
-      jobExecutionService: JobExecutionService
+      jobExecutionService: JobExecutionService,
+      webClient: WebClient = null
   ): StatusReportingVerticle = {
     val verticle = new StatusReportingVerticle()
     verticle.kvStore = kvStore
     verticle.jobExecutionService = jobExecutionService
+    if (webClient != null) {
+      verticle.webClient = webClient
+    }
     verticle
   }
orchestration/src/main/scala/ai/chronon/orchestration/agent/handlers/JobPollingHandler.scala (2)

58-62: URL should be configurable, not hardcoded.

Extract the endpoint path to configuration.

+  private val jobsListEndpoint = "/jobs/list"  // Move to AgentConfig in future updates

  // Make HTTP request to orchestration service
  webClient
-    .get(port, host, "/jobs/list")
+    .get(port, host, jobsListEndpoint)
    .addQueryParam("topic_id", AgentConfig.topicId)

90-90: Missing validation for empty jobs list.

Consider adding early return or debug log.

  val jobsToStart = (0 until jobsToStartJson.size()).map { i =>
    // ...
  }

  logger.info(s"Received ${jobsToStart.size} jobs to start")
+ 
+ if (jobsToStart.isEmpty) {
+   logger.debug("No jobs to start, returning")
+   return
+ }
orchestration/src/main/scala/ai/chronon/orchestration/agent/handlers/StatusReportingHandler.scala (2)

86-86: Excessive logging on each status update.

This log generates noise for every status update.

- logger.info(s"Connecting to host:$host at port:$port")
+ logger.debug(s"Connecting to host:$host at port:$port for job $jobId")

33-56: Consider batching status updates.

For efficiency, batch multiple status updates.

Implement a batching mechanism that collects status changes and sends them in a single request, reducing network overhead for multiple simultaneous status changes.

orchestration/src/main/scala/ai/chronon/orchestration/agent/JobExecutionService.scala (2)

21-26: Method comment and return type mismatch.

Comment says "as a string" but returns JobStatusType.

/** Gets the current status of a job.
  *
  * @param jobId The unique identifier for the job
- * @return The status of the job as a string
+ * @return The status of the job as JobStatusType
  */
def getJobStatus(jobId: String): JobStatusType

10-12: TODO comment needs to be addressed in future PRs.

Ensure this is tracked in your issue tracker.

Would you like me to create a GitHub issue to track extending this service for GCP and AWS implementations?

orchestration/src/main/scala/ai/chronon/orchestration/agent/verticle/JobPollingVerticle.scala (2)

80-91: Potential race condition in startPolling.

Use compareAndSet correctly but code flow is confusing.

private def startPolling(startPromise: Promise[Void]): Unit = {
  if (isPolling.compareAndSet(false, true)) {
    logger.info("Starting job polling")

    // Set up periodic polling with handler
    pollingTimerId = vertx.setPeriodic(AgentConfig.pollingIntervalMs, jobPollingHandler)

    startPromise.complete()
  } else {
+    logger.info("Job polling already active, not starting again")
    startPromise.complete()
  }
}

46-68: Consider Constructor Dependency Injection.

Current design has mutable fields set after construction.

A cleaner approach would be to use constructor dependency injection instead of field initialization. This would make dependencies explicit and improve testability while reducing mutability.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between c61aa3b and a3472a8.

📒 Files selected for processing (12)
  • api/thrift/agent.thrift (2 hunks)
  • orchestration/BUILD.bazel (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/agent/AgentConfig.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/agent/JobExecutionService.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/agent/KVStore.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/agent/handlers/JobPollingHandler.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/agent/handlers/StatusReportingHandler.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/agent/verticle/JobPollingVerticle.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/agent/verticle/StatusReportingVerticle.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/service/handlers/JobsHandler.scala (2 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/service/handlers/JobsHandlerSpec.scala (2 hunks)
  • tools/build_rules/prelude_bazel (2 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
orchestration/src/main/scala/ai/chronon/orchestration/agent/JobExecutionService.scala (2)
api/src/main/scala/ai/chronon/api/ParametricMacro.scala (1)
  • api (66-66)
orchestration/src/main/scala/ai/chronon/orchestration/agent/KVStore.scala (1)
  • createInMemory (72-72)
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: non_spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: bazel_config_tests
  • GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (13)
tools/build_rules/prelude_bazel (2)

92-92: Added Vert.x URI Template dependency.

The addition of the vertx-uri-template dependency supports URI parsing in the new Agent service.


102-102: Added Vert.x URI Template test dependency.

Consistent addition of the dependency for test environment.

api/thrift/agent.thrift (2)

77-77: Updated Job struct to reference JobInfo.

Good refactoring to support more comprehensive job metadata.


95-100:

❓ Verification inconclusive

Added explicit UNKNOWN status and shifted enum values.

Adding UNKNOWN status improves API robustness, but shifting enum values could break existing code.


🏁 Script executed:

#!/bin/bash
# Find occurrences of JobStatusType enum values being used directly as integers
rg -A 3 -B 3 "\bJobStatusType\b.*\b[0-9]+" --type scala

Length of output: 57


Action: Verify enum value usage across clients.

  • The api/thrift/agent.thrift enum now starts with UNKNOWN = 0, shifting subsequent values.
  • Automated search for direct integer usage in Scala produced no output, but this is inconclusive.
  • Ensure manual review of all consumers of JobStatusType to avoid unexpected breakage.
orchestration/BUILD.bazel (1)

115-115: Added resources inclusion to assembly.

Essential addition for packaging configuration files needed by the Agent service.

orchestration/src/main/scala/ai/chronon/orchestration/service/handlers/JobsHandler.scala (2)

4-4: Added JobInfo import.

Required for the new job structure.


45-49: Updated job creation to use JobInfo.

Implementation properly adapts to the new Thrift structure.

orchestration/src/test/scala/ai/chronon/orchestration/test/service/handlers/JobsHandlerSpec.scala (2)

68-69: Updated job ID access pattern correctly.

Code accesses jobId via the new jobInfo structure, aligning with the Thrift API changes.


96-96: Job ID access correctly updated.

Updated to access jobId through jobInfo structure, consistent with the data model changes.

orchestration/src/main/scala/ai/chronon/orchestration/agent/AgentConfig.scala (1)

3-6: TODO comment needs implementation plan.

The comment indicates these configurations need to be moved to a client-specific file.

Do you plan to implement this configuration file in a follow-up PR? Consider adding a tracking issue to your project management system.

orchestration/src/main/scala/ai/chronon/orchestration/agent/KVStore.scala (1)

5-12: Documentation clearly states implementation status.

The TODO comment provides context that this is a placeholder implementation.

orchestration/src/main/scala/ai/chronon/orchestration/agent/verticle/StatusReportingVerticle.scala (1)

87-87:

❓ Verification inconclusive

Verify StatusReportingHandler implements Handler.

StatusReportingHandler is directly used with setPeriodic, ensure it implements Handler.

Run this script to check the implementation:


🏁 Script executed:

#!/bin/bash
# Check if StatusReportingHandler implements Handler<Long>
ast-grep --pattern 'class StatusReportingHandler$_$ implements Handler<Long>' || echo "StatusReportingHandler doesn't implement Handler<Long>"

Length of output: 79


Verify StatusReportingHandler Conformance

Please ensure that StatusReportingHandler extends Handler[Long] as required by its use in setPeriodic. The previous automated check was inconclusive; a manual verification is advised.

  • File: orchestration/src/main/scala/ai/chronon/orchestration/agent/verticle/StatusReportingVerticle.scala, line 87
orchestration/src/main/scala/ai/chronon/orchestration/agent/verticle/JobPollingVerticle.scala (1)

93-108: Good cleanup in stop method.

Resources are properly released on shutdown.

Comment on lines 9 to 23
/** Hostname for the orchestration service */
val orchestrationServiceHostname: String = "localhost"

/** Port for the orchestration service */
val orchestrationServicePort: Int = 3903

/** Interval for polling jobs from the orchestration service (in milliseconds) */
val pollingIntervalMs: Long = 5 * 60 * 1000

/** Interval for reporting job status to the orchestration service (in milliseconds) */
val statusReportingIntervalMs: Long = 5 * 60 * 1000

/** PubSub topic id to pull job requests from */
val topicId: String = "test-topic"
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace hardcoded values with configurable parameters.

Hardcoded values limit flexibility across different environments.

-  /** Hostname for the orchestration service */
-  val orchestrationServiceHostname: String = "localhost"
-
-  /** Port for the orchestration service */
-  val orchestrationServicePort: Int = 3903
-
-  /** Interval for polling jobs from the orchestration service (in milliseconds) */
-  val pollingIntervalMs: Long = 5 * 60 * 1000
-
-  /** Interval for reporting job status to the orchestration service (in milliseconds) */
-  val statusReportingIntervalMs: Long = 5 * 60 * 1000
-
-  /** PubSub topic id to pull job requests from */
-  val topicId: String = "test-topic"
+  /** Hostname for the orchestration service */
+  val orchestrationServiceHostname: String = sys.env.getOrElse("ORCHESTRATION_SERVICE_HOSTNAME", "localhost")
+
+  /** Port for the orchestration service */
+  val orchestrationServicePort: Int = sys.env.getOrElse("ORCHESTRATION_SERVICE_PORT", "3903").toInt
+
+  /** Interval for polling jobs from the orchestration service (in milliseconds) */
+  val pollingIntervalMs: Long = sys.env.getOrElse("POLLING_INTERVAL_MS", (5 * 60 * 1000).toString).toLong
+
+  /** Interval for reporting job status to the orchestration service (in milliseconds) */
+  val statusReportingIntervalMs: Long = sys.env.getOrElse("STATUS_REPORTING_INTERVAL_MS", (5 * 60 * 1000).toString).toLong
+
+  /** PubSub topic id to pull job requests from */
+  val topicId: String = sys.env.getOrElse("TOPIC_ID", "test-topic")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/** Hostname for the orchestration service */
val orchestrationServiceHostname: String = "localhost"
/** Port for the orchestration service */
val orchestrationServicePort: Int = 3903
/** Interval for polling jobs from the orchestration service (in milliseconds) */
val pollingIntervalMs: Long = 5 * 60 * 1000
/** Interval for reporting job status to the orchestration service (in milliseconds) */
val statusReportingIntervalMs: Long = 5 * 60 * 1000
/** PubSub topic id to pull job requests from */
val topicId: String = "test-topic"
}
/** Hostname for the orchestration service */
val orchestrationServiceHostname: String = sys.env.getOrElse("ORCHESTRATION_SERVICE_HOSTNAME", "localhost")
/** Port for the orchestration service */
val orchestrationServicePort: Int = sys.env.getOrElse("ORCHESTRATION_SERVICE_PORT", "3903").toInt
/** Interval for polling jobs from the orchestration service (in milliseconds) */
val pollingIntervalMs: Long = sys.env.getOrElse("POLLING_INTERVAL_MS", (5 * 60 * 1000).toString).toLong
/** Interval for reporting job status to the orchestration service (in milliseconds) */
val statusReportingIntervalMs: Long = sys.env.getOrElse("STATUS_REPORTING_INTERVAL_MS", (5 * 60 * 1000).toString).toLong
/** PubSub topic id to pull job requests from */
val topicId: String = sys.env.getOrElse("TOPIC_ID", "test-topic")
}

Comment on lines +45 to +65
class InMemoryKVStore extends KVStore {
private val store = scala.collection.mutable.Map[String, Job]()

override def storeJob(jobId: String, job: Job): Unit = {
store.put(jobId, job)
}

override def getJob(jobId: String): Option[Job] = {
store.get(jobId)
}

override def getAllJobs: List[Job] = {
store.values.toList
}

override def updateJobStatus(jobId: String, status: JobStatusType): Unit = {
store.get(jobId).foreach { job =>
job.jobInfo.setCurrentStatus(status)
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Make InMemoryKVStore thread-safe.

Current implementation uses mutable map without synchronization, potentially leading to race conditions.

 class InMemoryKVStore extends KVStore {
-  private val store = scala.collection.mutable.Map[String, Job]()
+  private val store = new java.util.concurrent.ConcurrentHashMap[String, Job]()

   override def storeJob(jobId: String, job: Job): Unit = {
-    store.put(jobId, job)
+    store.put(jobId, job)
   }

   override def getJob(jobId: String): Option[Job] = {
-    store.get(jobId)
+    Option(store.get(jobId))
   }

   override def getAllJobs: List[Job] = {
-    store.values.toList
+    import scala.jdk.CollectionConverters._
+    store.values().asScala.toList
   }

   override def updateJobStatus(jobId: String, status: JobStatusType): Unit = {
-    store.get(jobId).foreach { job =>
-      job.jobInfo.setCurrentStatus(status)
+    Option(store.get(jobId)).foreach { job =>
+      // Create updated job with new status to avoid race conditions
+      val updatedJob = job.deepCopy()
+      updatedJob.jobInfo.setCurrentStatus(status)
+      store.put(jobId, updatedJob)
     }
   }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
class InMemoryKVStore extends KVStore {
private val store = scala.collection.mutable.Map[String, Job]()
override def storeJob(jobId: String, job: Job): Unit = {
store.put(jobId, job)
}
override def getJob(jobId: String): Option[Job] = {
store.get(jobId)
}
override def getAllJobs: List[Job] = {
store.values.toList
}
override def updateJobStatus(jobId: String, status: JobStatusType): Unit = {
store.get(jobId).foreach { job =>
job.jobInfo.setCurrentStatus(status)
}
}
}
class InMemoryKVStore extends KVStore {
private val store = new java.util.concurrent.ConcurrentHashMap[String, Job]()
override def storeJob(jobId: String, job: Job): Unit = {
store.put(jobId, job)
}
override def getJob(jobId: String): Option[Job] = {
Option(store.get(jobId))
}
override def getAllJobs: List[Job] = {
import scala.jdk.CollectionConverters._
store.values().asScala.toList
}
override def updateJobStatus(jobId: String, status: JobStatusType): Unit = {
Option(store.get(jobId)).foreach { job =>
// Create updated job with new status to avoid race conditions
val updatedJob = job.deepCopy()
updatedJob.jobInfo.setCurrentStatus(status)
store.put(jobId, updatedJob)
}
}
}

Comment on lines +19 to +20
*/
def storeJob(jobId: String, job: Job): Unit
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

❓ Verification inconclusive

Add validation for inputs.

No checks for null or empty jobId.

   /** Stores a job with the given ID.
     *
     * @param jobId The unique identifier for the job
     * @param job The job object to store
     */
-  def storeJob(jobId: String, job: Job): Unit
+  def storeJob(jobId: String, job: Job): Unit
// Implementation example:
override def storeJob(jobId: String, job: Job): Unit = {
  require(jobId != null && jobId.nonEmpty, "Job ID cannot be null or empty")
  require(job != null, "Job cannot be null")
  // ... rest of implementation
}

Input validation required:
Please add inline validations to ensure that jobId is neither null nor empty and that job is not null. For example:

override def storeJob(jobId: String, job: Job): Unit = {
  require(jobId != null && jobId.nonEmpty, "Job ID cannot be null or empty")
  require(job != null, "Job cannot be null")
  // ... rest of implementation
}

Comment on lines +95 to +101
if (kvStore.getJob(job.jobInfo.getJobId).isEmpty) {
// Store job in KV store
kvStore.storeJob(job.jobInfo.getJobId, job)

// Submit job to cluster
jobExecutionService.submitJob(job)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Job submission lacks error handling.

Add try-catch around store and submit operations.

- // Store job in KV store
- kvStore.storeJob(job.jobInfo.getJobId, job)
-
- // Submit job to cluster
- jobExecutionService.submitJob(job)
+ try {
+   // Store job in KV store
+   kvStore.storeJob(job.jobInfo.getJobId, job)
+
+   // Submit job to cluster
+   jobExecutionService.submitJob(job)
+   logger.debug(s"Successfully submitted job ${job.jobInfo.getJobId}")
+ } catch {
+   case e: Exception =>
+     logger.error(s"Failed to store/submit job ${job.jobInfo.getJobId}", e)
+ }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (kvStore.getJob(job.jobInfo.getJobId).isEmpty) {
// Store job in KV store
kvStore.storeJob(job.jobInfo.getJobId, job)
// Submit job to cluster
jobExecutionService.submitJob(job)
}
if (kvStore.getJob(job.jobInfo.getJobId).isEmpty) {
try {
// Store job in KV store
kvStore.storeJob(job.jobInfo.getJobId, job)
// Submit job to cluster
jobExecutionService.submitJob(job)
logger.debug(s"Successfully submitted job ${job.jobInfo.getJobId}")
} catch {
case e: Exception =>
logger.error(s"Failed to store/submit job ${job.jobInfo.getJobId}", e)
}
}

Comment on lines +77 to +85
// Create JobInfo
val jobInfo = new JobInfo()
jobInfo.setJobId(jobJson.getString("jobId"))
// Set other job info properties as needed

// Create Job with JobInfo
val job = new Job()
job.setJobInfo(jobInfo)
// Set other job properties as needed
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Incomplete job creation implementation.

Comments indicate missing properties.


🏁 Script executed:

#!/bin/bash
# Verify full JobInfo and Job structure requirements
rg -A 5 "class JobInfo|class Job" --type scala

Length of output: 6986


Action: Fully initialize Job and JobInfo.
The current implementation only sets the jobId, with placeholder comments for other properties. Please update the code based on the complete specifications of JobInfo and Job to ensure that all mandatory fields are appropriately initialized.

Comment on lines +39 to +41
val jobId = job.jobInfo.getJobId
val currentStatus = job.jobInfo.getCurrentStatus
val latestStatus = jobExecutionService.getJobStatus(jobId)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Missing null checks for job.jobInfo.

Add null checks to prevent NPEs.

jobs.foreach { job =>
-  val jobId = job.jobInfo.getJobId
-  val currentStatus = job.jobInfo.getCurrentStatus
+  if (job.jobInfo == null) {
+    logger.warn("Found job with null jobInfo, skipping")
+    return
+  }
+  val jobId = job.jobInfo.getJobId
+  val currentStatus = job.jobInfo.getCurrentStatus
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
val jobId = job.jobInfo.getJobId
val currentStatus = job.jobInfo.getCurrentStatus
val latestStatus = jobExecutionService.getJobStatus(jobId)
jobs.foreach { job =>
if (job.jobInfo == null) {
logger.warn("Found job with null jobInfo, skipping")
return
}
val jobId = job.jobInfo.getJobId
val currentStatus = job.jobInfo.getCurrentStatus
val latestStatus = jobExecutionService.getJobStatus(jobId)
}

Comment on lines 34 to 37
override def submitJob(job: Job): Unit = {
// Simulate job submission
jobStatuses.put(job.jobInfo.getJobId, JobStatusType.RUNNING)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add null check for job.jobInfo.

Prevent NPE if jobInfo is null.

override def submitJob(job: Job): Unit = {
  // Simulate job submission
-  jobStatuses.put(job.jobInfo.getJobId, JobStatusType.RUNNING)
+  if (job != null && job.jobInfo != null) {
+    jobStatuses.put(job.jobInfo.getJobId, JobStatusType.RUNNING)
+  } else {
+    throw new IllegalArgumentException("Job or JobInfo cannot be null")
+  }
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
override def submitJob(job: Job): Unit = {
// Simulate job submission
jobStatuses.put(job.jobInfo.getJobId, JobStatusType.RUNNING)
}
override def submitJob(job: Job): Unit = {
// Simulate job submission
if (job != null && job.jobInfo != null) {
jobStatuses.put(job.jobInfo.getJobId, JobStatusType.RUNNING)
} else {
throw new IllegalArgumentException("Job or JobInfo cannot be null")
}
}

Comment on lines 39 to 42
override def getJobStatus(jobId: String): JobStatusType = {
// Return job status or default to "UNKNOWN"
jobStatuses.getOrElse(jobId, JobStatusType.UNKNOWN)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Missing validation for jobId parameter.

Add check for null or empty jobId.

override def getJobStatus(jobId: String): JobStatusType = {
  // Return job status or default to "UNKNOWN"
+  if (jobId == null || jobId.isEmpty) {
+    return JobStatusType.UNKNOWN
+  }
  jobStatuses.getOrElse(jobId, JobStatusType.UNKNOWN)
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
override def getJobStatus(jobId: String): JobStatusType = {
// Return job status or default to "UNKNOWN"
jobStatuses.getOrElse(jobId, JobStatusType.UNKNOWN)
}
override def getJobStatus(jobId: String): JobStatusType = {
// Return job status or default to "UNKNOWN"
if (jobId == null || jobId.isEmpty) {
return JobStatusType.UNKNOWN
}
jobStatuses.getOrElse(jobId, JobStatusType.UNKNOWN)
}

logger.info("Starting job polling")

// Set up periodic polling with handler
pollingTimerId = vertx.setPeriodic(AgentConfig.pollingIntervalMs, jobPollingHandler)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Configuration should be more explicit.

Don't rely on global AgentConfig.

// Set up periodic polling with handler
-pollingTimerId = vertx.setPeriodic(AgentConfig.pollingIntervalMs, jobPollingHandler)
+final long pollingInterval = AgentConfig.pollingIntervalMs;
+logger.info(s"Setting polling interval to $pollingInterval ms");
+pollingTimerId = vertx.setPeriodic(pollingInterval, jobPollingHandler)

Committable suggestion skipped: line range outside the PR's diff.

@kumar-zlai kumar-zlai requested a review from chewy-zlai April 17, 2025 16:18
Copy link
Contributor

@piyush-zlai piyush-zlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Largely looks good, left a few cosmetic / nit shaped comments

*/
object AgentConfig {

/** Hostname for the orchestration service */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - these comments don't add a ton as your field names are pretty descriptive as is. So you could drop them for brevity

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* This service provides abstraction for job submission and status management,
* allowing the application to work with different execution backends.
*
* TODO: To extend this further for handling all types of jobs and have
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can remove this as it feels like a useful comment more for the PR description. For the code itself the todo could highlight specific methods / aspects that are missing that you want to address

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* TODO: To use our chronon KVStore, extend if needed and have implementations for both GCP and AWS
* Just a place holder will change a lot in coming PRs
*/
trait KVStore {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we instead call this AgentStore or something more specific than KV store? The interface isn't generic enough to be a kv store (unlike the KVStore trait we have in api which is very generic..)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, makes sense. Addressed this in the later PR #668

* This service provides abstraction for key-value storage operations,
* allowing the application to work with different storage implementations.
*
* TODO: To use our chronon KVStore, extend if needed and have implementations for both GCP and AWS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice if we could inject that KV store here (if we know the apis match what we need)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed this in the later PR #668

logger.info("Polling for jobs")

try {
logger.info(s"topicId: ${AgentConfig.topicId}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - can merge this log line with the one on 31

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

logger.info(s"topicId: ${AgentConfig.topicId}")

// Create a proper response handler
val responseHandler = new Handler[AsyncResult[HttpResponse[Buffer]]] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for readability, can pull this out into a separate method or class level handler field

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, done.


// Send status update to orchestration service with proper handler
webClient
.put(port, host, "/jobs/$jobId/status")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this uri string have a s"..." ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

private def createKVStore(): KVStore = {
// Use the newly created in-memory implementation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

private def createJobExecutionService(): JobExecutionService = {
// Use the newly created in-memory implementation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. In general claude code is adding more comments which are redundant and not needed in most cases, so i had to explicitly tell it, gave an example to follow and remember for all future code and it seems to be doing ok now :)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
orchestration/src/main/scala/ai/chronon/orchestration/agent/handlers/JobPollingHandler.scala (2)

76-84: ⚠️ Potential issue

Incomplete job initialization.

Fill in required properties for JobInfo and Job.


94-100: ⚠️ Potential issue

Job submission lacks error handling.

Add try-catch around store and submit operations.

🧹 Nitpick comments (4)
orchestration/src/main/scala/ai/chronon/orchestration/agent/handlers/JobPollingHandler.scala (4)

31-31: Logger information could be combined.

Merge the topic ID log with the connection log.

- logger.info(s"Polling for jobs with topicId: ${AgentConfig.topicId}")
- 
- try {
-   val host = AgentConfig.orchestrationServiceHostname
-   val port = AgentConfig.orchestrationServicePort
- 
-   logger.info(s"Connecting to host:$host at port:$port")
+ try {
+   val host = AgentConfig.orchestrationServiceHostname
+   val port = AgentConfig.orchestrationServicePort
+ 
+   logger.info(s"Polling for jobs with topicId: ${AgentConfig.topicId} using host:$host at port:$port")

33-38: AgentConfig dependency hardcoded.

Inject config for better testability.

class JobPollingHandler(
    webClient: WebClient,
    kvStore: KVStore,
-   jobExecutionService: JobExecutionService
+   jobExecutionService: JobExecutionService,
+   config: AgentConfig
) extends Handler[java.lang.Long] {
  private val logger: Logger = LoggerFactory.getLogger(classOf[JobPollingHandler])

  /** Vert.x Handler implementation that gets called periodically.
    *
    * @param timerId The ID of the timer that triggered this handler
    */
  override def handle(timerId: java.lang.Long): Unit = {
    pollForJobs()
  }

  /** Polls the orchestration service for new jobs. */
  private def pollForJobs(): Unit = {
    logger.info(s"Polling for jobs with topicId: ${config.topicId}")

    try {
      val host = config.orchestrationServiceHostname
      val port = config.orchestrationServicePort

51-66: Extract handler to class field.

Move response handler creation to class level.

class JobPollingHandler(
    webClient: WebClient, 
    kvStore: KVStore,
    jobExecutionService: JobExecutionService
) extends Handler[java.lang.Long] {
  private val logger: Logger = LoggerFactory.getLogger(classOf[JobPollingHandler])
+
+  /** Response handler for job polling HTTP requests */
+  private val jobPollingResponseHandler = new Handler[AsyncResult[HttpResponse[Buffer]]] {
+    override def handle(ar: AsyncResult[HttpResponse[Buffer]]): Unit = {
+      if (ar.succeeded()) {
+        val response = ar.result()
+        if (response.statusCode() == 200) {
+          val responseBody = response.bodyAsJsonObject()
+          processJobsResponse(responseBody)
+        } else {
+          logger.error(s"Failed to poll for jobs: ${response.statusCode()} ${response.statusMessage()}")
+        }
+      } else {
+        logger.error("Failed to poll for jobs", ar.cause())
+      }
+    }
+  }

  /** Vert.x Handler implementation that gets called periodically.
    *
    * @param timerId The ID of the timer that triggered this handler
    */
  override def handle(timerId: java.lang.Long): Unit = {
    pollForJobs()
  }

  /** Polls the orchestration service for new jobs. */
  private def pollForJobs(): Unit = {
    logger.info(s"Polling for jobs with topicId: ${AgentConfig.topicId}")

    try {
      val host = AgentConfig.orchestrationServiceHostname
      val port = AgentConfig.orchestrationServicePort

      logger.info(s"Connecting to host:$host at port:$port")

      // Make HTTP request to orchestration service
      webClient
        .get(port, host, "/jobs/list")
        .addQueryParam("topic_id", AgentConfig.topicId)
-       .send(createJobPollingResponseHandler())
+       .send(jobPollingResponseHandler)
    } catch {
      case e: Exception =>
        logger.error("Error while polling for jobs", e)
    }
  }

-  /** Creates a response handler for job polling HTTP requests */
-  private def createJobPollingResponseHandler(): Handler[AsyncResult[HttpResponse[Buffer]]] = {
-    (ar: AsyncResult[HttpResponse[Buffer]]) =>
-      {
-        if (ar.succeeded()) {
-          val response = ar.result()
-          if (response.statusCode() == 200) {
-            val responseBody = response.bodyAsJsonObject()
-            processJobsResponse(responseBody)
-          } else {
-            logger.error(s"Failed to poll for jobs: ${response.statusCode()} ${response.statusMessage()}")
-          }
-        } else {
-          logger.error("Failed to poll for jobs", ar.cause())
-        }
-      }
-  }

103-107: Success log level inconsistency.

Match log levels for better analysis.

  } match {
    case Success(_) =>
-     logger.info("Successfully processed jobs response")
+     logger.debug("Successfully processed jobs response")
    case Failure(e) =>
      logger.error("Error processing jobs response", e)
  }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between a3472a8 and a6ab0e7.

📒 Files selected for processing (6)
  • api/thrift/agent.thrift (2 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/agent/AgentConfig.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/agent/JobExecutionService.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/agent/handlers/JobPollingHandler.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/agent/handlers/StatusReportingHandler.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/agent/verticle/JobPollingVerticle.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
  • orchestration/src/main/scala/ai/chronon/orchestration/agent/AgentConfig.scala
  • orchestration/src/main/scala/ai/chronon/orchestration/agent/handlers/StatusReportingHandler.scala
  • api/thrift/agent.thrift
  • orchestration/src/main/scala/ai/chronon/orchestration/agent/JobExecutionService.scala
  • orchestration/src/main/scala/ai/chronon/orchestration/agent/verticle/JobPollingVerticle.scala
⏰ Context from checks skipped due to timeout of 90000ms (21)
  • GitHub Check: service_tests
  • GitHub Check: service_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: hub_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: hub_tests
  • GitHub Check: online_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: api_tests
  • GitHub Check: api_tests
  • GitHub Check: flink_tests
  • GitHub Check: online_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: flink_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: orchestration_tests
  • GitHub Check: orchestration_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (1)
orchestration/src/main/scala/ai/chronon/orchestration/agent/handlers/JobPollingHandler.scala (1)

92-93: Implement TODO before production use.

The comment indicates missing handling for other cases.

What other cases need to be handled here? Consider adding specific examples in the TODO comment for future implementation.

@kumar-zlai kumar-zlai merged commit 6370b1c into main Apr 18, 2025
25 checks passed
@kumar-zlai kumar-zlai deleted the kumar/agent_service branch April 18, 2025 18:01
kumar-zlai added a commit that referenced this pull request Apr 25, 2025
## Summary
Skeleton implementation for Agent service with job polling and status
reporting happening at regular intervals which can be configured. A lot
of place holder code which is going to change a lot in coming PR's so
didn't add tests for now, will add them in coming PR's

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced configuration management for agent components, including
polling intervals and service endpoints.
- Added abstractions and in-memory implementations for job execution and
key-value storage services.
- Implemented periodic job polling and status reporting handlers for
orchestration agents.
- Added Vert.x verticles to manage job polling and status reporting
workflows.

- **Improvements**
  - Enhanced job struct to use a detailed job info field.
- Expanded job status types with an explicit "unknown" state and updated
status codes.

- **Dependency Updates**
  - Included additional Vert.x dependencies for URI template support.
  - Packaged resource files into the orchestration binary.

- **Bug Fixes**
- Updated job handling logic and tests to reflect changes in job ID
access patterns.

- **Tests**
  - Adjusted tests to accommodate the new job info structure.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
kumar-zlai added a commit that referenced this pull request Apr 29, 2025
## Summary
Skeleton implementation for Agent service with job polling and status
reporting happening at regular intervals which can be configured. A lot
of place holder code which is going to change a lot in coming PR's so
didn't add tests for now, will add them in coming PR's

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced configuration management for agent components, including
polling intervals and service endpoints.
- Added abstractions and in-memory implementations for job execution and
key-value storage services.
- Implemented periodic job polling and status reporting handlers for
orchestration agents.
- Added Vert.x verticles to manage job polling and status reporting
workflows.

- **Improvements**
  - Enhanced job struct to use a detailed job info field.
- Expanded job status types with an explicit "unknown" state and updated
status codes.

- **Dependency Updates**
  - Included additional Vert.x dependencies for URI template support.
  - Packaged resource files into the orchestration binary.

- **Bug Fixes**
- Updated job handling logic and tests to reflect changes in job ID
access patterns.

- **Tests**
  - Adjusted tests to accommodate the new job info structure.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary
Skeleton implementation for Agent service with job polling and status
reporting happening at regular intervals which can be configured. A lot
of place holder code which is going to change a lot in coming PR's so
didn't add tests for now, will add them in coming PR's

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced configuration management for agent components, including
polling intervals and service endpoints.
- Added abstractions and in-memory implementations for job execution and
key-value storage services.
- Implemented periodic job polling and status reporting handlers for
orchestration agents.
- Added Vert.x verticles to manage job polling and status reporting
workflows.

- **Improvements**
  - Enhanced job struct to use a detailed job info field.
- Expanded job status types with an explicit "unknown" state and updated
status codes.

- **Dependency Updates**
  - Included additional Vert.x dependencies for URI template support.
  - Packaged resource files into the orchestration binary.

- **Bug Fixes**
- Updated job handling logic and tests to reflect changes in job ID
access patterns.

- **Tests**
  - Adjusted tests to accommodate the new job info structure.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary
Skeleton implementation for Agent service with job polling and status
reporting happening at regular intervals which can be configured. A lot
of place holder code which is going to change a lot in coming PR's so
didn't add tests for now, will add them in coming PR's

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced configuration management for agent components, including
polling intervals and service endpoints.
- Added abstractions and in-memory implementations for job execution and
key-value storage services.
- Implemented periodic job polling and status reporting handlers for
orchestration agents.
- Added Vert.x verticles to manage job polling and status reporting
workflows.

- **Improvements**
  - Enhanced job struct to use a detailed job info field.
- Expanded job status types with an explicit "unknown" state and updated
status codes.

- **Dependency Updates**
  - Included additional Vert.x dependencies for URI template support.
  - Packaged resource files into the orchestration binary.

- **Bug Fixes**
- Updated job handling logic and tests to reflect changes in job ID
access patterns.

- **Tests**
  - Adjusted tests to accommodate the new job info structure.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 16, 2025
## Summary
Skeleton implementation for Agent service with job polling and status
reporting happening at regular intervals which can be configured. A lot
of place holder code which is going to change a lot in coming PR's so
didn't add tests for now, will add them in coming PR's

## Cheour clientslist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced configuration management for agent components, including
polling intervals and service endpoints.
- Added abstractions and in-memory implementations for job execution and
key-value storage services.
- Implemented periodic job polling and status reporting handlers for
orchestration agents.
- Added Vert.x verticles to manage job polling and status reporting
workflows.

- **Improvements**
  - Enhanced job struct to use a detailed job info field.
- Expanded job status types with an explicit "unknown" state and updated
status codes.

- **Dependency Updates**
  - Included additional Vert.x dependencies for URI template support.
  - Paour clientsaged resource files into the orchestration binary.

- **Bug Fixes**
- Updated job handling logic and tests to reflect changes in job ID
access patterns.

- **Tests**
  - Adjusted tests to accommodate the new job info structure.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants