diff --git a/api/src/main/scala/ai/chronon/api/planner/GroupByOfflinePlanner.scala b/api/src/main/scala/ai/chronon/api/planner/GroupByOfflinePlanner.scala deleted file mode 100644 index 74b437f06c..0000000000 --- a/api/src/main/scala/ai/chronon/api/planner/GroupByOfflinePlanner.scala +++ /dev/null @@ -1,43 +0,0 @@ -package ai.chronon.api.planner -import ai.chronon.api.Extensions.GroupByOps -import ai.chronon.api.{DataModel, GroupBy, MetaData, PartitionSpec, TableDependency, ThriftJsonCodec} -import ai.chronon.orchestration.GroupByBackfillNode - -import scala.util.Try - -class GroupByOfflinePlanner(groupBy: GroupBy)(implicit outputPartitionSpec: PartitionSpec) - extends Planner[GroupBy](groupBy)(outputPartitionSpec) { - - private def tableDeps: Seq[TableDependency] = TableDependencies.fromGroupBy(groupBy) - - private def effectiveStepDays: Int = { - val defaultStepDays = if (groupBy.dataModel == DataModel.EVENTS) 15 else 1 - val configuredStepDaysOpt = Option(groupBy.metaData.executionInfo).flatMap(e => Option(e.stepDays)) - configuredStepDaysOpt.getOrElse(defaultStepDays) - } - - val backfillNodeOpt: Option[GroupByBackfillNode] = for (execInfo <- Option(groupBy.metaData.executionInfo)) yield { - val metaData = MetaDataUtils.layer(groupBy.metaData, - "backfill", - groupBy.metaData.name + "/backfill", - tableDeps, - Some(effectiveStepDays)) - metaData.executionInfo.setScheduleCron(execInfo.scheduleCron) - new GroupByBackfillNode().setGroupBy(groupBy).setMetaData(metaData) - } - - override def offlineNodes: Seq[PlanNode] = ??? - - override def onlineNodes: Seq[PlanNode] = ??? -} -object GroupByOfflinePlanner { - implicit class GroupByIsPlanNode(node: GroupBy) extends PlanNode { - override def metaData: MetaData = node.metaData - override def contents: Any = node - override def semanticHash: String = ThriftJsonCodec.hexDigest({ - val result = node.deepCopy() - result.unsetMetaData() - result - }) - } -} diff --git a/api/src/main/scala/ai/chronon/api/planner/GroupByPlanner.scala b/api/src/main/scala/ai/chronon/api/planner/GroupByPlanner.scala new file mode 100644 index 0000000000..92ca4effe3 --- /dev/null +++ b/api/src/main/scala/ai/chronon/api/planner/GroupByPlanner.scala @@ -0,0 +1,45 @@ +package ai.chronon.api.planner +import ai.chronon.api.Extensions.GroupByOps +import ai.chronon.api.{DataModel, GroupBy, MetaData, PartitionSpec, TableDependency, ThriftJsonCodec} +import ai.chronon.planner.{ConfPlan, GroupByBackfillNode, Node, NodeContent} + +import scala.util.Try + +class GroupByPlanner(groupBy: GroupBy)(implicit outputPartitionSpec: PartitionSpec) + extends Planner[GroupBy](groupBy)(outputPartitionSpec) { + + private def tableDeps: Seq[TableDependency] = TableDependencies.fromGroupBy(groupBy) + + private def effectiveStepDays: Int = { + val defaultStepDays = if (groupBy.dataModel == DataModel.EVENTS) 15 else 1 + val configuredStepDaysOpt = Option(groupBy.metaData.executionInfo).flatMap(e => Option(e.stepDays)) + configuredStepDaysOpt.getOrElse(defaultStepDays) + } + + // execInfo can be heavy - and we don't want to duplicate it + private def eraseExecutionInfo: GroupBy = { + val result = groupBy.deepCopy() + result.metaData.unsetExecutionInfo() + result + } + + def backfillNode: Node = { + + val metaData = MetaDataUtils.layer(groupBy.metaData, + "backfill", + groupBy.metaData.name + "/backfill", + tableDeps, + Some(effectiveStepDays)) + + val node = new GroupByBackfillNode().setGroupBy(eraseExecutionInfo) + + toNode(metaData, _.setGroupByBackfill(node), eraseExecutionInfo) + } + + def streamingNode: Option[Node] = groupBy.streamingSource.map { _ => + + } + + override def buildPlan: ConfPlan = ??? +} + diff --git a/api/src/main/scala/ai/chronon/api/planner/JoinOfflinePlanner.scala b/api/src/main/scala/ai/chronon/api/planner/JoinPlanner.scala similarity index 52% rename from api/src/main/scala/ai/chronon/api/planner/JoinOfflinePlanner.scala rename to api/src/main/scala/ai/chronon/api/planner/JoinPlanner.scala index a78ace97ab..dcd81fcd90 100644 --- a/api/src/main/scala/ai/chronon/api/planner/JoinOfflinePlanner.scala +++ b/api/src/main/scala/ai/chronon/api/planner/JoinPlanner.scala @@ -3,18 +3,33 @@ package ai.chronon.api.planner import ai.chronon.api.Extensions.{GroupByOps, MetadataOps, SourceOps, StringOps} import ai.chronon.api.ScalaJavaConversions.{IterableOps, IteratorOps} import ai.chronon.api._ -import ai.chronon.api.planner.JoinOfflinePlanner._ -import ai.chronon.api.planner.GroupByOfflinePlanner._ -import ai.chronon.orchestration._ +import ai.chronon.planner._ -import scala.collection.mutable -import scala.language.{implicitConversions, reflectiveCalls} import scala.collection.Seq +import scala.language.{implicitConversions, reflectiveCalls} -class JoinOfflinePlanner(join: Join)(implicit outputPartitionSpec: PartitionSpec) +class JoinPlanner(join: Join)(implicit outputPartitionSpec: PartitionSpec) extends Planner[Join](join)(outputPartitionSpec) { - val leftSourceNode: SourceWithFilterNode = { + // will mutate the join in place - use on deepCopy-ied objects only + private def unsetNestedMetadata(join: Join): Unit = { + join.unsetMetaData() + Option(join.joinParts).foreach(_.iterator().toScala.foreach(_.groupBy.unsetMetaData())) + Option(join.labelParts).foreach(_.labels.iterator().toScala.foreach(_.groupBy.unsetMetaData())) + join.unsetOnlineExternalParts() + } + + private def joinWithoutExecutionInfo: Join = { + val copied = join.deepCopy() + copied.metaData.unsetExecutionInfo() + Option(copied.joinParts).foreach(_.iterator().toScala.foreach(_.groupBy.metaData.unsetExecutionInfo())) + Option(copied.labelParts).foreach(_.labels.iterator().toScala.foreach(_.groupBy.metaData.unsetExecutionInfo())) + copied.unsetOnlineExternalParts() + copied + } + + val leftSourceNode: Node = { + val left = join.left val result = new SourceWithFilterNode() .setSource(left) @@ -28,18 +43,18 @@ class JoinOfflinePlanner(join: Join)(implicit outputPartitionSpec: PartitionSpec // at this point metaData.outputTable = join_namespace.source______ val metaData = MetaDataUtils.layer( join.metaData, - JoinNodeType.LEFT_SOURCE.toString.toLowerCase(), + "left_source", outputTableName, TableDependencies.fromSource(join.left).toSeq, stepDays = Some(1) ) - result.setMetaData(metaData) + toNode(metaData, _.setSourceWithFilter(result), result) } - val bootstrapNodeOpt: Option[JoinBootstrapNode] = Option(join.bootstrapParts).map { bootstrapParts => + private val bootstrapNodeOpt: Option[Node] = Option(join.bootstrapParts).map { bootstrapParts => val result = new JoinBootstrapNode() - .setJoin(join) + .setJoin(joinWithoutExecutionInfo) // bootstrap tables are unfortunately unique to the join - can't be re-used if a new join part is added val bootstrapNodeName = join.metaData.name + "/boostrap" @@ -50,18 +65,31 @@ class JoinOfflinePlanner(join: Join)(implicit outputPartitionSpec: PartitionSpec val metaData = MetaDataUtils.layer( join.metaData, - JoinNodeType.BOOTSTRAP.toString.toLowerCase(), + "bootstrap", bootstrapNodeName, tableDeps, stepDays = Some(1) ) - result.setMetaData(metaData) + val content = new NodeContent() + content.setJoinBootstrap(result) + + val copy = result.deepCopy() + unsetNestedMetadata(copy.join) + + toNode(metaData, _.setJoinBootstrap(result), copy) + } + + private def copyAndEraseExecutionInfo(joinPart: JoinPart): JoinPart = { + val copy = joinPart.deepCopy() + copy.groupBy.metaData.unsetExecutionInfo() + copy } - private def buildJoinPartNode(joinPart: JoinPart): JoinPartNode = { + private def buildJoinPartNode(joinPart: JoinPart): Node = { + val result = new JoinPartNode() - .setJoinPart(joinPart) + .setJoinPart(copyAndEraseExecutionInfo(joinPart)) .setLeftDataModel(join.left.dataModel) .setLeftSourceTable(leftSourceNode.metaData.outputTable) @@ -76,26 +104,29 @@ class JoinOfflinePlanner(join: Join)(implicit outputPartitionSpec: PartitionSpec .map(_.stepDays) .getOrElse(joinPart.groupBy.dataModel match { case DataModel.ENTITIES => 1 - case DataModel.EVENTS => 15 + case DataModel.EVENTS => 15 }) // pull conf params from the groupBy metadata, but use the join namespace to write to. val metaData = MetaDataUtils .layer( joinPart.groupBy.metaData, - JoinNodeType.RIGHT_PART.toString.toLowerCase(), + "right_part", partTable, deps, stepDays = Some(stepDays) ) .setOutputNamespace(join.metaData.outputNamespace) - result.setMetaData(metaData) + val copy = result.deepCopy() + copy.joinPart.groupBy.unsetMetaData() + + toNode(metaData, _.setJoinPart(result), copy) } - private val joinPartNodes: Seq[JoinPartNode] = join.joinParts.toScala.map { buildJoinPartNode }.toSeq + private val joinPartNodes: Seq[Node] = join.joinParts.toScala.map { buildJoinPartNode }.toSeq - val mergeNode: JoinMergeNode = { + val mergeNode: Node = { val result = new JoinMergeNode() .setJoin(join) @@ -118,16 +149,22 @@ class JoinOfflinePlanner(join: Join)(implicit outputPartitionSpec: PartitionSpec val metaData = MetaDataUtils .layer( join.metaData, - JoinNodeType.MERGE.toString.toLowerCase(), + "merge", mergeNodeName, deps, stepDays = Some(1) ) - result.setMetaData(metaData) + val copy = result.deepCopy() + unsetNestedMetadata(copy.join) + copy.join.unsetDerivations() + copy.join.unsetLabelParts() + + toNode(metaData, _.setJoinMerge(result), copy) } - private val derivationNodeOpt: Option[JoinDerivationNode] = Option(join.derivations).map { _ => + private val derivationNodeOpt: Option[Node] = Option(join.derivations).map { _ => + val result = new JoinDerivationNode() .setJoin(join) @@ -136,13 +173,17 @@ class JoinOfflinePlanner(join: Join)(implicit outputPartitionSpec: PartitionSpec val metaData = MetaDataUtils .layer( join.metaData, - JoinNodeType.DERIVE.toString.toLowerCase(), + "derive", derivationNodeName, Seq(TableDependencies.fromTable(mergeNode.metaData.outputTable)), stepDays = Some(1) ) - result.setMetaData(metaData) + val copy = result.deepCopy() + unsetNestedMetadata(copy.join) + copy.join.unsetLabelParts() + + toNode(metaData, _.setJoinDerivation(result), copy) } // these need us to additionally (groupBy backfill) generate the snapshot tables @@ -156,7 +197,7 @@ class JoinOfflinePlanner(join: Join)(implicit outputPartitionSpec: PartitionSpec ) .getOrElse(Array.empty) - private val labelJoinNodeOpt: Option[LabelJoinNode] = Option(join.labelParts).map { labelParts => + private val labelJoinNodeOpt: Option[Node] = Option(join.labelParts).map { labelParts => val result = new LabelJoinNode() .setJoin(join) @@ -171,35 +212,47 @@ class JoinOfflinePlanner(join: Join)(implicit outputPartitionSpec: PartitionSpec val metaData = MetaDataUtils .layer( join.metaData, - JoinNodeType.LABEL_JOIN.toString.toLowerCase(), + "label_join", labelNodeName, labelPartDeps, stepDays = Some(1) ) - result.setMetaData(metaData) - } + val copy = result.deepCopy() + unsetNestedMetadata(result.join) - override def offlineNodes: Seq[PlanNode] = { - val result: mutable.ArrayBuffer[PlanNode] = mutable.ArrayBuffer.empty[PlanNode] + toNode(metaData, _.setLabelJoin(result), copy) + } - result.append(leftSourceNode) - bootstrapNodeOpt.foreach(bn => result.append(bn)) - joinPartNodes.foreach(jpn => result.append(jpn)) + def offlineNodes: Seq[Node] = { + + Seq(leftSourceNode) ++ + bootstrapNodeOpt ++ + joinPartNodes ++ + Seq(mergeNode) ++ + derivationNodeOpt ++ + snapshotLabelParts.map(_.groupBy).map{ + gb => new GroupByPlanner(gb).backfillNode + } ++ + labelJoinNodeOpt + } - result.append(mergeNode) - derivationNodeOpt.foreach(dn => result.append(dn)) - snapshotLabelParts.foreach(lp => result.append(lp.groupBy)) - labelJoinNodeOpt.foreach(ljn => result.append(ljn)) - result + override def onlineNodes: Seq[Node] = { + // depends on all } - override def onlineNodes: Seq[PlanNode] = ??? + + override def buildPlan: ConfPlan = { + + val result = new ConfPlan() + result.setNodes() + } } -object JoinOfflinePlanner { +object JoinPlanner { + // will mutate the join in place - use on deepCopy-ied objects only private def unsetNestedMetadata(join: Join): Unit = { join.unsetMetaData() Option(join.joinParts).foreach(_.iterator().toScala.foreach(_.groupBy.unsetMetaData())) @@ -207,82 +260,4 @@ object JoinOfflinePlanner { join.unsetOnlineExternalParts() } - implicit class LabelJoinNodeIsPlanNode(node: LabelJoinNode) extends PlanNode { - override def metaData: MetaData = node.metaData - override def contents: Any = node - override def semanticHash: String = ThriftJsonCodec.hexDigest({ - val result = node.deepCopy() - result.unsetMetaData() - unsetNestedMetadata(result.join) - result - }) - } - - implicit class JoinDerivationNodeIsPlanNode(node: JoinDerivationNode) extends PlanNode { - override def metaData: MetaData = node.metaData - override def contents: Any = node - override def semanticHash: String = ThriftJsonCodec.hexDigest({ - val result = node.deepCopy() - result.unsetMetaData() - unsetNestedMetadata(result.join) - result.join.unsetLabelParts() - result - }) - } - - implicit class JoinMergeNodeIsPlanNode(node: JoinMergeNode) extends PlanNode { - override def metaData: MetaData = node.metaData - override def contents: Any = node - override def semanticHash: String = ThriftJsonCodec.hexDigest({ - val result = node.deepCopy() - result.unsetMetaData() - unsetNestedMetadata(result.join) - result.join.unsetDerivations() - result.join.unsetLabelParts() - result - }) - } - - implicit class JoinPartNodeIsPlanNode(node: JoinPartNode) extends PlanNode { - override def metaData: MetaData = node.metaData - override def contents: Any = node - override def semanticHash: String = ThriftJsonCodec.hexDigest({ - val result = node.deepCopy() - result.unsetMetaData() - result.joinPart.groupBy.unsetMetaData() - result - }) - } - - implicit class JoinBootstrapNodeIsPlanNode(node: JoinBootstrapNode) extends PlanNode { - override def metaData: MetaData = node.metaData - override def contents: Any = node - override def semanticHash: String = ThriftJsonCodec.hexDigest({ - val result = node.deepCopy() - result.unsetMetaData() - unsetNestedMetadata(result.join) - result - }) - } - - implicit class SourceWithFilterNodeIsPlanNode(node: SourceWithFilterNode) extends PlanNode { - override def metaData: MetaData = node.metaData - override def contents: Any = node - override def semanticHash: String = ThriftJsonCodec.hexDigest({ - val result = node.deepCopy() - result.unsetMetaData() - result - }) - } - - implicit class JoinIsPlanNode(node: Join) extends PlanNode { - override def metaData: MetaData = node.metaData - override def contents: Any = node - override def semanticHash: String = ThriftJsonCodec.hexDigest({ - val result = node.deepCopy() - unsetNestedMetadata(node) - result - }) - } - } diff --git a/api/src/main/scala/ai/chronon/api/planner/PlanNode.scala b/api/src/main/scala/ai/chronon/api/planner/LocalRunner.scala similarity index 82% rename from api/src/main/scala/ai/chronon/api/planner/PlanNode.scala rename to api/src/main/scala/ai/chronon/api/planner/LocalRunner.scala index e212af5e18..553f52dfd0 100644 --- a/api/src/main/scala/ai/chronon/api/planner/PlanNode.scala +++ b/api/src/main/scala/ai/chronon/api/planner/LocalRunner.scala @@ -2,18 +2,13 @@ package ai.chronon.api.planner import ai.chronon.api.thrift.TBase import ai.chronon.api.{Constants, MetaData, ThriftJsonCodec} +import ai.chronon.planner.{Node, NodeContent, SourceWithFilterNode} import java.io.File import scala.reflect.ClassTag import scala.util.Try -trait PlanNode { - def metaData: MetaData - def contents: Any - def semanticHash: String -} - -object PlanNode { +object LocalRunner { private def listFiles(dir: String = "."): Seq[String] = { val baseDir = new File(dir) @@ -43,7 +38,5 @@ object PlanNode { .flatMap(tryParsingConf[T]) .toSeq - def planConfs[T](confs: Seq[T], planner: Planner[T]): Seq[PlanNode] = ??? - def generatePlans(compiledFolder: String): Seq[PlanNode] = ??? } diff --git a/api/src/main/scala/ai/chronon/api/planner/Planner.scala b/api/src/main/scala/ai/chronon/api/planner/Planner.scala index e8e4ec6e5b..c0f58316a8 100644 --- a/api/src/main/scala/ai/chronon/api/planner/Planner.scala +++ b/api/src/main/scala/ai/chronon/api/planner/Planner.scala @@ -1,11 +1,26 @@ package ai.chronon.api.planner -import ai.chronon.api.PartitionSpec +import ai.chronon.api.thrift.TBase +import ai.chronon.api.{MetaData, PartitionSpec, ThriftJsonCodec} +import ai.chronon.planner.{ConfPlan, Node, NodeContent} import scala.collection.Seq +case class Mode(name: String, nodes: Seq[Node], cron: String) + abstract class Planner[T](conf: T)(implicit outputPartitionSpec: PartitionSpec) { - def offlineNodes: Seq[PlanNode] - def onlineNodes: Seq[PlanNode] - def metricsNodes: Seq[PlanNode] = ??? // TODO: Add later + + def buildPlan: ConfPlan + + def toNode[T <: TBase[_, _]: Manifest](metaData: MetaData, contentSetter: NodeContent => Unit, hashableNode: T): Node = { + val hash = ThriftJsonCodec.hexDigest(hashableNode) + + val content = new NodeContent() + contentSetter(content) + + new Node() + .setContent(content) + .setMetaData(metaData) + .setSemanticHash(hash) + } } diff --git a/api/thrift/orchestration.thrift b/api/thrift/orchestration.thrift index 1f6e8f9dd1..ed295e1a2b 100644 --- a/api/thrift/orchestration.thrift +++ b/api/thrift/orchestration.thrift @@ -173,79 +173,6 @@ struct PhysicalNodeKey { 2: optional PhysicalNodeType nodeType } -// ====================== End of physical node types ====================== -// ====================== Modular Join Spark Job Args ====================== - -struct SourceWithFilterNode { - 1: optional api.MetaData metaData - - 2: optional api.Source source - 3: optional map> excludeKeys -} - -struct JoinBootstrapNode { - 1: optional api.MetaData metaData - 2: optional api.Join join -} - -struct JoinMergeNode { - 1: optional api.MetaData metaData - 2: optional api.Join join -} - -struct JoinDerivationNode { - 1: optional api.MetaData metaData - 2: optional api.Join join -} - -struct JoinPartNode { - 1: optional api.MetaData metaData - 2: optional string leftSourceTable - 3: optional api.DataModel leftDataModel - 4: optional api.JoinPart joinPart - 5: optional map> skewKeys -} - -struct LabelJoinNode { - 1: optional api.MetaData metaData - 2: optional api.Join join -} - -struct GroupByBackfillNode { - 1: optional api.MetaData metaData - 2: optional api.GroupBy groupBy -} - -struct GroupByUploadNode { - 1: optional api.MetaData metaData - 2: optional api.GroupBy groupBy -} - -struct GroupByStreamingNode { - 1: optional api.MetaData metaData - 2: optional api.GroupBy groupBy -} - -union NodeUnion { - // join nodes - 1: SourceWithFilterNode sourceWithFilter - 2: JoinBootstrapNode joinBootstrap - 3: JoinPartNode joinPart - 4: JoinMergeNode joinMerge - 5: JoinDerivationNode joinDerivation - 6: LabelJoinNode labelJoin - - // groupBy nodes - 7: GroupByBackfillNode groupByBackfill - 8: GroupByUploadNode groupByUpload - 9: GroupByStreamingNode groupByStreaming - - // stagingQuery nodes - 10: api.StagingQuery stagingQuery - - // TODO: add metrics nodes -} - enum NodeRunStatus { UNKNOWN = 0, WAITING = 1, @@ -254,10 +181,6 @@ enum NodeRunStatus { FAILED = 4 } -// ====================== End of Modular Join Spark Job Args =================== - -// ====================== Orchestration Service API Types ====================== - struct Conf { 1: optional string name 2: optional string hash diff --git a/api/thrift/planner.thrift b/api/thrift/planner.thrift new file mode 100644 index 0000000000..713eea5194 --- /dev/null +++ b/api/thrift/planner.thrift @@ -0,0 +1,101 @@ +namespace py ai.chronon.planner +namespace java ai.chronon.planner + +include "common.thrift" +include "api.thrift" + +struct SourceWithFilterNode { + 2: optional api.Source source + 3: optional map> excludeKeys +} + +struct JoinBootstrapNode { + 2: optional api.Join join +} + +struct JoinMergeNode { + 2: optional api.Join join +} + +struct JoinDerivationNode { + 2: optional api.Join join +} + +struct JoinPartNode { + 2: optional string leftSourceTable + 3: optional api.DataModel leftDataModel + 4: optional api.JoinPart joinPart + 5: optional map> skewKeys +} + +struct LabelJoinNode { + 2: optional api.Join join +} + +struct GroupByBackfillNode { + 2: optional api.GroupBy groupBy +} + +struct GroupByUploadNode { + 2: optional api.GroupBy groupBy +} + +struct GroupByStreamingNode { + 2: optional api.GroupBy groupBy +} + +struct JoinMetadataUpload { + 2: optional api.Join join +} + +union NodeContent { + // join nodes + 1: SourceWithFilterNode sourceWithFilter + 2: JoinBootstrapNode joinBootstrap + 3: JoinPartNode joinPart + 4: JoinMergeNode joinMerge + 5: JoinDerivationNode joinDerivation + 6: LabelJoinNode labelJoin + + 10: JoinMetadataUpload joinMetadataUpload + + + // groupBy nodes + 100: GroupByBackfillNode groupByBackfill + 101: GroupByUploadNode groupByUpload + 102: GroupByStreamingNode groupByStreaming + + // stagingQuery nodes + 200: api.StagingQuery stagingQueryBackfill + + // TODO: add metrics nodes +} + +enum Mode { + BACKFILL = 0, + DEPLOY = 1, + MONITOR = 2 +} + +struct Node { + 1: optional api.MetaData metaData + 2: optional NodeContent content + 3: optional string semanticHash + 4: optional bool isLongRunning +} + +// some nodes could be depended on by both offline and online nodes +struct ConfPlan { + 1: optional list nodes + 2: optional map terminalNodeNames +} + +// CLI rough sketch +// zipline backfill conf_name -- runs all nodes upstream of terminal offline node - prepares training data +// zipline deploy conf_name -- runs all nodes upstream of terminal online node - makes it ready for serving +// zipline run-only node_name -- runs one particular node +// zipline run node_name -- runs all nodes upstream + +// zipline compile -- compiles python +// zipline plan conf start end -- shows the node-step-graph that is produced by planner +// zipline sync -- compiles and uploads confs of this branch to remote \ No newline at end of file diff --git a/spark/src/main/scala/ai/chronon/spark/Driver.scala b/spark/src/main/scala/ai/chronon/spark/Driver.scala index f8d2f54c76..afbd9cbb3e 100644 --- a/spark/src/main/scala/ai/chronon/spark/Driver.scala +++ b/spark/src/main/scala/ai/chronon/spark/Driver.scala @@ -24,7 +24,7 @@ import ai.chronon.api.planner.RelevantLeftForJoinPart import ai.chronon.api.thrift.TBase import ai.chronon.online.{Api, MetadataDirWalker, MetadataEndPoint, TopicChecker} import ai.chronon.online.fetcher.{ConfPathOrName, FetchContext, FetcherMain, MetadataStore} -import ai.chronon.orchestration.{JoinMergeNode, JoinPartNode} +import ai.chronon.planner.{JoinMergeNode, JoinPartNode, SourceWithFilterNode} import ai.chronon.spark.batch._ import ai.chronon.spark.catalog.{Format, TableUtils} import ai.chronon.spark.stats.{CompareBaseJob, CompareJob, ConsistencyJob} @@ -830,7 +830,7 @@ object Driver { val outputTable = JoinUtils.computeLeftSourceTableName(join) // Create a SourceWithFilterNode with the extracted information - val sourceWithFilterNode = new ai.chronon.orchestration.SourceWithFilterNode() + val sourceWithFilterNode = new SourceWithFilterNode() sourceWithFilterNode.setSource(source) sourceWithFilterNode.setExcludeKeys(join.skewKeys) @@ -913,8 +913,6 @@ object Driver { .setName(joinPartTableName) .setOutputNamespace(outputNamespace) - joinPartNode.setMetaData(metadata) - // Calculate the date range val endDate = args.endDate() val startDate = args.startPartitionOverride.getOrElse(args.endDate()) @@ -923,7 +921,7 @@ object Driver { .setEndDate(endDate) // Run the JoinPartJob - val joinPartJob = new JoinPartJob(joinPartNode, dateRange, showDf = false)(tableUtils) + val joinPartJob = new JoinPartJob(joinPartNode, metadata, dateRange, showDf = false)(tableUtils) joinPartJob.run() logger.info(s"JoinPartJob completed. Output table: ${joinPartNode.metaData.outputTable}") diff --git a/spark/src/main/scala/ai/chronon/spark/Join.scala b/spark/src/main/scala/ai/chronon/spark/Join.scala index ec1930c392..2b77bbfa3c 100644 --- a/spark/src/main/scala/ai/chronon/spark/Join.scala +++ b/spark/src/main/scala/ai/chronon/spark/Join.scala @@ -23,7 +23,7 @@ import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api._ import ai.chronon.spark.catalog.TableUtils import ai.chronon.online.serde.SparkConversions -import ai.chronon.orchestration.{JoinBootstrapNode, JoinPartNode} +import ai.chronon.planner.{JoinBootstrapNode, JoinPartNode} import ai.chronon.spark.Extensions._ import ai.chronon.spark.JoinUtils._ import ai.chronon.spark.batch._ @@ -366,9 +366,8 @@ class Join(joinConf: api.Join, .setLeftDataModel(joinConfCloned.getLeft.dataModel) .setJoinPart(joinPart) .setSkewKeys(skewKeysAsJava) - .setMetaData(joinPartNodeMetadata) - val joinPartJob = new JoinPartJob(joinPartNode, joinPartJobRange) + val joinPartJob = new JoinPartJob(joinPartNode, joinPartNodeMetadata, joinPartJobRange) val df = joinPartJob.run(Some(runContext)).map(df => joinPart -> df) Thread.currentThread().setName(s"done-$threadName") diff --git a/spark/src/main/scala/ai/chronon/spark/JoinBase.scala b/spark/src/main/scala/ai/chronon/spark/JoinBase.scala index d1da54eacc..744f3b3eba 100644 --- a/spark/src/main/scala/ai/chronon/spark/JoinBase.scala +++ b/spark/src/main/scala/ai/chronon/spark/JoinBase.scala @@ -20,9 +20,8 @@ import ai.chronon.api import ai.chronon.api.{Accuracy, Constants, DateRange, JoinPart, PartitionRange, PartitionSpec} import ai.chronon.api.DataModel.ENTITIES import ai.chronon.api.Extensions._ -import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.online.metrics.Metrics -import ai.chronon.orchestration.JoinBootstrapNode +import ai.chronon.planner.JoinBootstrapNode import ai.chronon.spark.catalog.TableUtils import ai.chronon.spark.Extensions._ import ai.chronon.spark.JoinUtils.{coalescedJoin, leftDf, shouldRecomputeLeft, tablesToRecompute} diff --git a/spark/src/main/scala/ai/chronon/spark/JoinDerivationJob.scala b/spark/src/main/scala/ai/chronon/spark/JoinDerivationJob.scala index 7c27d86f77..2112d5726d 100644 --- a/spark/src/main/scala/ai/chronon/spark/JoinDerivationJob.scala +++ b/spark/src/main/scala/ai/chronon/spark/JoinDerivationJob.scala @@ -4,7 +4,7 @@ import ai.chronon.api.Extensions._ import ai.chronon.api.ScalaJavaConversions.ListOps import ai.chronon.api.DateRange import ai.chronon.spark.catalog.TableUtils -import ai.chronon.orchestration.JoinDerivationNode +import ai.chronon.planner.JoinDerivationNode import ai.chronon.spark.Extensions._ import org.apache.spark.sql.functions.{coalesce, col, expr} diff --git a/spark/src/main/scala/ai/chronon/spark/JoinUtils.scala b/spark/src/main/scala/ai/chronon/spark/JoinUtils.scala index 6721bec1c1..732b2848fd 100644 --- a/spark/src/main/scala/ai/chronon/spark/JoinUtils.scala +++ b/spark/src/main/scala/ai/chronon/spark/JoinUtils.scala @@ -21,7 +21,7 @@ import ai.chronon.api._ import ai.chronon.api.DataModel.EVENTS import ai.chronon.api.Extensions._ import ai.chronon.api.ScalaJavaConversions._ -import ai.chronon.api.planner.JoinOfflinePlanner +import ai.chronon.api.planner.JoinPlanner import ai.chronon.spark.Extensions._ import ai.chronon.spark.catalog.TableUtils import com.google.gson.Gson @@ -531,10 +531,10 @@ object JoinUtils { } def computeLeftSourceTableName(join: api.Join)(implicit tableUtils: TableUtils): String = { - new JoinOfflinePlanner(join)(tableUtils.partitionSpec).leftSourceNode.metaData.cleanName + new JoinPlanner(join)(tableUtils.partitionSpec).leftSourceNode.metaData.cleanName } def computeFullLeftSourceTableName(join: api.Join)(implicit tableUtils: TableUtils): String = { - new JoinOfflinePlanner(join)(tableUtils.partitionSpec).leftSourceNode.metaData.outputTable + new JoinPlanner(join)(tableUtils.partitionSpec).leftSourceNode.metaData.outputTable } } diff --git a/spark/src/main/scala/ai/chronon/spark/batch/JoinBootstrapJob.scala b/spark/src/main/scala/ai/chronon/spark/batch/JoinBootstrapJob.scala index d1b9432738..0cdcd63671 100644 --- a/spark/src/main/scala/ai/chronon/spark/batch/JoinBootstrapJob.scala +++ b/spark/src/main/scala/ai/chronon/spark/batch/JoinBootstrapJob.scala @@ -4,7 +4,7 @@ import ai.chronon.api.Extensions.{BootstrapPartOps, DateRangeOps, ExternalPartOp import ai.chronon.api.ScalaJavaConversions.ListOps import ai.chronon.api.{Constants, DateRange, PartitionRange, PartitionSpec, StructField, StructType} import ai.chronon.online.serde.SparkConversions -import ai.chronon.orchestration.JoinBootstrapNode +import ai.chronon.planner.JoinBootstrapNode import ai.chronon.spark.Extensions._ import ai.chronon.spark.JoinUtils.{coalescedJoin, set_add} import ai.chronon.spark.{BootstrapInfo, JoinUtils} diff --git a/spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala b/spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala index e69c92192a..063d146629 100644 --- a/spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala +++ b/spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala @@ -5,7 +5,7 @@ import ai.chronon.api.Extensions.{DateRangeOps, DerivationOps, GroupByOps, JoinP import ai.chronon.api.PartitionRange.toTimeRange import ai.chronon.api._ import ai.chronon.online.metrics.Metrics -import ai.chronon.orchestration.JoinPartNode +import ai.chronon.planner.JoinPartNode import ai.chronon.spark.Extensions._ import ai.chronon.spark.catalog.TableUtils import ai.chronon.spark.{GroupBy, JoinUtils} @@ -23,7 +23,7 @@ case class JoinPartJobContext(leftDf: Option[DfWithStats], tableProps: Map[String, String], runSmallMode: Boolean) -class JoinPartJob(node: JoinPartNode, range: DateRange, showDf: Boolean = false)(implicit tableUtils: TableUtils) { +class JoinPartJob(node: JoinPartNode, metaData: MetaData, range: DateRange, showDf: Boolean = false)(implicit tableUtils: TableUtils) { @transient lazy val logger: Logger = LoggerFactory.getLogger(getClass) implicit val partitionSpec: PartitionSpec = tableUtils.partitionSpec @@ -58,7 +58,7 @@ class JoinPartJob(node: JoinPartNode, range: DateRange, showDf: Boolean = false) JoinPartJobContext(Option(leftWithStats), joinLevelBloomMapOpt, - Option(node.metaData.tableProps).getOrElse(Map.empty[String, String]), + Option(metaData.tableProps).getOrElse(Map.empty[String, String]), runSmallMode) } @@ -67,7 +67,7 @@ class JoinPartJob(node: JoinPartNode, range: DateRange, showDf: Boolean = false) jobContext, joinPart, dateRange, - node.metaData.outputTable + metaData.outputTable ) } diff --git a/spark/src/main/scala/ai/chronon/spark/batch/MergeJob.scala b/spark/src/main/scala/ai/chronon/spark/batch/MergeJob.scala index ed9ce9f87e..78cd9daee7 100644 --- a/spark/src/main/scala/ai/chronon/spark/batch/MergeJob.scala +++ b/spark/src/main/scala/ai/chronon/spark/batch/MergeJob.scala @@ -4,7 +4,7 @@ import ai.chronon.api.DataModel.ENTITIES import ai.chronon.api.Extensions.{DateRangeOps, GroupByOps, JoinPartOps, MetadataOps, SourceOps} import ai.chronon.api.planner.RelevantLeftForJoinPart import ai.chronon.api.{Accuracy, Constants, DataModel, DateRange, JoinPart, PartitionRange, PartitionSpec, QueryUtils} -import ai.chronon.orchestration.JoinMergeNode +import ai.chronon.planner.JoinMergeNode import ai.chronon.spark.Extensions._ import ai.chronon.spark.JoinUtils.coalescedJoin import ai.chronon.spark.JoinUtils diff --git a/spark/src/main/scala/ai/chronon/spark/batch/SourceJob.scala b/spark/src/main/scala/ai/chronon/spark/batch/SourceJob.scala index 3a434da481..6f1cccfeab 100644 --- a/spark/src/main/scala/ai/chronon/spark/batch/SourceJob.scala +++ b/spark/src/main/scala/ai/chronon/spark/batch/SourceJob.scala @@ -3,7 +3,7 @@ import ai.chronon.api.DataModel.EVENTS import ai.chronon.api.{Constants, DateRange} import ai.chronon.api.Extensions.{MetadataOps, _} import ai.chronon.api.ScalaJavaConversions.JListOps -import ai.chronon.orchestration.SourceWithFilterNode +import ai.chronon.planner.SourceWithFilterNode import ai.chronon.spark.Extensions._ import ai.chronon.spark.JoinUtils.parseSkewKeys import ai.chronon.spark.catalog.TableUtils diff --git a/spark/src/test/scala/ai/chronon/spark/test/batch/ModularJoinTest.scala b/spark/src/test/scala/ai/chronon/spark/test/batch/ModularJoinTest.scala index 1ce651ba0a..5cc4d8b512 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/batch/ModularJoinTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/batch/ModularJoinTest.scala @@ -6,11 +6,11 @@ import ai.chronon.api.Extensions._ import ai.chronon.spark.batch._ import ai.chronon.api.{planner, _} -import ai.chronon.orchestration.JoinBootstrapNode -import ai.chronon.orchestration.JoinDerivationNode -import ai.chronon.orchestration.JoinPartNode -import ai.chronon.orchestration.JoinMergeNode -import ai.chronon.orchestration.SourceWithFilterNode +import ai.chronon.planner.JoinBootstrapNode +import ai.chronon.planner.JoinDerivationNode +import ai.chronon.planner.JoinPartNode +import ai.chronon.planner.JoinMergeNode +import ai.chronon.planner.SourceWithFilterNode import ai.chronon.spark.Extensions._ import ai.chronon.spark._ import ai.chronon.spark.test.{DataFrameGen, TableTestUtils} @@ -296,9 +296,8 @@ class ModularJoinTest extends AnyFlatSpec { .setLeftSourceTable(sourceOutputTable) .setLeftDataModel(joinConf.getLeft.dataModel) .setJoinPart(jp1) - .setMetaData(metaData) - val joinPartJob = new JoinPartJob(joinPartNode, joinPartJobRange) + val joinPartJob = new JoinPartJob(joinPartNode, metaData, joinPartJobRange) joinPartJob.run() tableUtils.sql(s"SELECT * FROM $joinPart1FullTableName").show() @@ -315,9 +314,8 @@ class ModularJoinTest extends AnyFlatSpec { .setLeftSourceTable(sourceOutputTable) .setLeftDataModel(joinConf.getLeft.dataModel) .setJoinPart(jp2) - .setMetaData(metaData2) - val joinPart2Job = new JoinPartJob(joinPartNode2, joinPartJobRange) + val joinPart2Job = new JoinPartJob(joinPartNode2, metaData2, joinPartJobRange) joinPart2Job.run() tableUtils.sql(s"SELECT * FROM $joinPart2FullTableName").show()