|
| 1 | +package ai.chronon.agent |
| 2 | + |
| 3 | +import ai.chronon.api.{Job, JobStatusType} |
| 4 | +import ai.chronon.online.KVStore |
| 5 | +import ai.chronon.online.KVStore.{GetRequest, PutRequest} |
| 6 | +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} |
| 7 | +import com.fasterxml.jackson.module.scala.DefaultScalaModule |
| 8 | +import org.slf4j.{Logger, LoggerFactory} |
| 9 | + |
| 10 | +import java.nio.charset.StandardCharsets |
| 11 | +import scala.concurrent.duration._ |
| 12 | +import scala.concurrent.{Await, ExecutionContext} |
| 13 | +import scala.util.{Failure, Success, Try} |
| 14 | + |
| 15 | +/** KVStore implementation that uses separate datasets for active vs completed jobs |
| 16 | + * to reduce range scan costs and improve performance when retrieving active jobs. |
| 17 | + */ |
| 18 | +class KVJobStore(kvStore: KVStore) extends JobStore { |
| 19 | + |
| 20 | + private val logger: Logger = LoggerFactory.getLogger(getClass) |
| 21 | + |
| 22 | + // TODO: Move these to config |
| 23 | + private val activeJobsDataset = "AGENT_ACTIVE_JOBS" |
| 24 | + private val completedJobsDataset = "AGENT_COMPLETED_JOBS" |
| 25 | + private val timeoutMs = 10000 |
| 26 | + |
| 27 | + // Ensure the datasets exist |
| 28 | + kvStore.create(activeJobsDataset) |
| 29 | + kvStore.create(completedJobsDataset) |
| 30 | + |
| 31 | + implicit val ec: ExecutionContext = kvStore.executionContext |
| 32 | + |
| 33 | + private val objectMapper = new ObjectMapper() |
| 34 | + .registerModule(DefaultScalaModule) |
| 35 | + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) |
| 36 | + |
| 37 | + private def serializeJob(job: Job): Array[Byte] = { |
| 38 | + try { |
| 39 | + objectMapper.writeValueAsBytes(job) |
| 40 | + } catch { |
| 41 | + case e: Exception => |
| 42 | + logger.error("Error serializing job", e) |
| 43 | + throw e |
| 44 | + } |
| 45 | + } |
| 46 | + |
| 47 | + private def deserializeJob(bytes: Array[Byte]): Try[Job] = { |
| 48 | + Try { |
| 49 | + objectMapper.readValue(bytes, classOf[Job]) |
| 50 | + } |
| 51 | + } |
| 52 | + |
| 53 | + private def isJobActive(job: Job): Boolean = { |
| 54 | + val status = job.getJobInfo.getCurrentStatus |
| 55 | + // Jobs are active if they are pending, running, or unknown |
| 56 | + // They're considered completed if they succeeded, failed, or were cancelled |
| 57 | + status == JobStatusType.PENDING || |
| 58 | + status == JobStatusType.RUNNING || |
| 59 | + status == JobStatusType.UNKNOWN |
| 60 | + } |
| 61 | + |
| 62 | + override def storeJob(jobId: String, job: Job): Unit = { |
| 63 | + try { |
| 64 | + val jobBytes = serializeJob(job) |
| 65 | + // Store in active or completed dataset based on status |
| 66 | + val dataset = if (isJobActive(job)) activeJobsDataset else completedJobsDataset |
| 67 | + val putRequest = PutRequest(jobId.getBytes(StandardCharsets.UTF_8), jobBytes, dataset) |
| 68 | + |
| 69 | + val result = Await.result(kvStore.put(putRequest), timeoutMs.millis) |
| 70 | + if (!result) { |
| 71 | + logger.error(s"Failed to store job $jobId") |
| 72 | + } |
| 73 | + } catch { |
| 74 | + case e: Exception => |
| 75 | + logger.error(s"Error storing job $jobId", e) |
| 76 | + throw e |
| 77 | + } |
| 78 | + } |
| 79 | + |
| 80 | + override def getJob(jobId: String): Option[Job] = { |
| 81 | + // First try to find the job in the active jobs dataset |
| 82 | + getJobFromDataset(jobId, activeJobsDataset).orElse { |
| 83 | + // If not found, try the completed jobs dataset |
| 84 | + getJobFromDataset(jobId, completedJobsDataset) |
| 85 | + } |
| 86 | + } |
| 87 | + |
| 88 | + private def getJobFromDataset(jobId: String, dataset: String): Option[Job] = { |
| 89 | + try { |
| 90 | + val getRequest = GetRequest(jobId.getBytes(StandardCharsets.UTF_8), dataset) |
| 91 | + val responseFuture = kvStore.get(getRequest) |
| 92 | + val response = Await.result(responseFuture, timeoutMs.millis) |
| 93 | + |
| 94 | + response.values match { |
| 95 | + case Success(values) if values.nonEmpty => |
| 96 | + val latestValue = values.maxBy(_.millis) |
| 97 | + deserializeJob(latestValue.bytes) match { |
| 98 | + case Success(job) => Some(job) |
| 99 | + case Failure(e) => |
| 100 | + logger.error(s"Failed to deserialize job $jobId from $dataset", e) |
| 101 | + None |
| 102 | + } |
| 103 | + case Failure(e) => |
| 104 | + logger.error(s"Error retrieving job $jobId from $dataset", e) |
| 105 | + None |
| 106 | + } |
| 107 | + } catch { |
| 108 | + case e: Exception => |
| 109 | + logger.error(s"Error getting job $jobId from $dataset", e) |
| 110 | + None |
| 111 | + } |
| 112 | + } |
| 113 | + |
| 114 | + override def getAllActiveJobs: List[Job] = { |
| 115 | + try { |
| 116 | + val listRequest = KVStore.ListRequest(activeJobsDataset, Map.empty) |
| 117 | + val listResponseFuture = kvStore.list(listRequest) |
| 118 | + val listResponse = Await.result(listResponseFuture, timeoutMs.millis) |
| 119 | + |
| 120 | + listResponse.values match { |
| 121 | + case Success(values) => |
| 122 | + values.flatMap { listValue => |
| 123 | + deserializeJob(listValue.valueBytes) match { |
| 124 | + case Success(job) => Some(job) |
| 125 | + case Failure(e) => |
| 126 | + logger.error(s"Failed to deserialize job from list", e) |
| 127 | + None |
| 128 | + } |
| 129 | + }.toList |
| 130 | + case Failure(e) => |
| 131 | + logger.error("Failed to list active jobs", e) |
| 132 | + List.empty |
| 133 | + } |
| 134 | + } catch { |
| 135 | + case e: Exception => |
| 136 | + logger.error("Error listing active jobs", e) |
| 137 | + List.empty |
| 138 | + } |
| 139 | + } |
| 140 | + |
| 141 | + /** Uses tombstoning to mark records as deleted since KVStore lacks a direct delete operation. |
| 142 | + * This prevents stale job entries from persisting when jobs move between datasets. |
| 143 | + */ |
| 144 | + private def deleteJobFromDataset(jobId: String, dataset: String): Unit = { |
| 145 | + try { |
| 146 | + // Create an empty value (this is effectively a "tombstone" marking the entry as deleted) |
| 147 | + val emptyValueRequest = PutRequest( |
| 148 | + jobId.getBytes(StandardCharsets.UTF_8), |
| 149 | + Array.emptyByteArray, |
| 150 | + dataset, |
| 151 | + Some(System.currentTimeMillis()) |
| 152 | + ) |
| 153 | + |
| 154 | + val result = Await.result(kvStore.put(emptyValueRequest), timeoutMs.millis) |
| 155 | + if (!result) { |
| 156 | + logger.error(s"Failed to delete job $jobId from $dataset") |
| 157 | + } else { |
| 158 | + logger.info(s"Successfully deleted job $jobId from $dataset") |
| 159 | + } |
| 160 | + } catch { |
| 161 | + case e: Exception => |
| 162 | + logger.error(s"Error deleting job $jobId from $dataset", e) |
| 163 | + } |
| 164 | + } |
| 165 | + |
| 166 | + override def updateJobStatus(jobId: String, status: JobStatusType): Unit = { |
| 167 | + try { |
| 168 | + getJob(jobId) match { |
| 169 | + case Some(job) => |
| 170 | + val oldStatus = job.getJobInfo.getCurrentStatus |
| 171 | + val wasActive = isJobActive(job) |
| 172 | + |
| 173 | + // Update the status |
| 174 | + job.getJobInfo.setCurrentStatus(status) |
| 175 | + val isNowActive = isJobActive(job) |
| 176 | + |
| 177 | + // Check if we need to move between active and completed tables |
| 178 | + if (wasActive != isNowActive) { |
| 179 | + logger.info(s"Job $jobId moved from status $oldStatus to $status, moving between datasets") |
| 180 | + |
| 181 | + // Delete from the old dataset by writing a tombstone |
| 182 | + val oldDataset = if (wasActive) activeJobsDataset else completedJobsDataset |
| 183 | + deleteJobFromDataset(jobId, oldDataset) |
| 184 | + |
| 185 | + // Store in the new dataset (will use the correct dataset based on status) |
| 186 | + storeJob(jobId, job) |
| 187 | + } else { |
| 188 | + // Just update the job in the same dataset |
| 189 | + storeJob(jobId, job) |
| 190 | + } |
| 191 | + case None => |
| 192 | + logger.warn(s"Cannot update status for job $jobId: job not found") |
| 193 | + } |
| 194 | + } catch { |
| 195 | + case e: Exception => |
| 196 | + logger.error(s"Error updating status for job $jobId", e) |
| 197 | + } |
| 198 | + } |
| 199 | +} |
0 commit comments