Skip to content

refactor tableutils into its own module catalog #679

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package ai.chronon.integrations.aws

import ai.chronon.spark.ChrononHudiKryoRegistrator
import ai.chronon.spark.SparkSessionBuilder
import ai.chronon.spark.TableUtils
import ai.chronon.spark.catalog.TableUtils
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession
import org.junit.Assert.assertEquals
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package ai.chronon.integrations.aws

import ai.chronon.spark.ChrononHudiKryoRegistrator
import ai.chronon.spark.SparkSessionBuilder
import ai.chronon.spark.TableUtils
import ai.chronon.spark.catalog.TableUtils
import org.apache.spark.sql.SparkSession
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ai.chronon.integrations.cloud_gcp

import ai.chronon.spark.format.Format
import ai.chronon.spark.catalog.Format
import com.google.cloud.bigquery._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.execution.FileSourceScanExec
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package ai.chronon.integrations.cloud_gcp

import ai.chronon.spark.TableUtils
import ai.chronon.spark.format.Format
import ai.chronon.spark.catalog.TableUtils
import ai.chronon.spark.catalog.Format
import com.google.cloud.bigquery.BigQueryOptions
import com.google.cloud.spark.bigquery.v2.Spark35BigQueryTableProvider
import org.apache.spark.sql.{DataFrame, SparkSession}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ai.chronon.integrations.cloud_gcp
import ai.chronon.spark.format.{DefaultFormatProvider, Format, Iceberg}

import ai.chronon.spark.catalog.{DefaultFormatProvider, Format, Iceberg}
import com.google.cloud.bigquery._
import com.google.cloud.spark.bigquery.v2.Spark31BigQueryTable
import org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import ai.chronon.api.MetaData
import ai.chronon.integrations.cloud_gcp.BigTableKVStore.ColumnFamilyQualifierString
import ai.chronon.integrations.cloud_gcp.BigTableKVStore.ColumnFamilyString
import ai.chronon.spark.SparkSessionBuilder
import ai.chronon.spark.TableUtils
import ai.chronon.spark.catalog.TableUtils
import org.apache.spark.sql.functions
import org.apache.spark.sql.functions.udf
import org.rogach.scallop.ScallopConf
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package ai.chronon.integrations.cloud_gcp

import ai.chronon.spark.format.{FormatProvider, Iceberg}
import ai.chronon.spark.{SparkSessionBuilder, TableUtils}
import ai.chronon.spark.SparkSessionBuilder
import ai.chronon.spark.catalog.{FormatProvider, Iceberg, TableUtils}
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.{Input, Output}
import com.google.cloud.hadoop.fs.gcs.{
Expand Down
21 changes: 21 additions & 0 deletions spark/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,27 @@ scala_library(
],
)

scala_library(
name = "catalog_lib",
srcs = glob(["src/main/scala/ai/chronon/spark/catalog/*.scala"]),
format = select({
"//tools/config:scala_2_13": False,
"//conditions:default": True,
}),
visibility = ["//visibility:public"],
deps = [
":lib",
"//api:lib",
"//api:thrift_java",
"//tools/build_rules/spark:spark-exec",
maven_artifact("org.slf4j:slf4j-api"),
maven_artifact("org.apache.thrift:libthrift"),
maven_artifact("org.apache.logging.log4j:log4j-api"),
maven_artifact("org.apache.logging.log4j:log4j-core"),
maven_artifact_with_suffix("io.delta:delta-spark"),
],
)

scala_library(
name = "submission_lib",
srcs = glob(["src/main/scala/ai/chronon/spark/submission/*.scala"]),
Expand Down
1 change: 1 addition & 0 deletions spark/src/main/scala/ai/chronon/spark/Analyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Row, types}
import org.apache.spark.sql.types.{StringType, StructType}
import org.slf4j.{Logger, LoggerFactory}
import ai.chronon.spark.catalog.TableUtils

import scala.collection.{Seq, immutable, mutable}
import scala.collection.mutable.ListBuffer
Expand Down
1 change: 1 addition & 0 deletions spark/src/main/scala/ai/chronon/spark/BootstrapInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.types.{StringType, StructType}
import org.slf4j.{Logger, LoggerFactory}
import ai.chronon.spark.catalog.TableUtils

import scala.collection.{Seq, immutable, mutable}
import scala.util.Try
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/ai/chronon/spark/Driver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import ai.chronon.online.fetcher.{ConfPathOrName, FetchContext, FetcherMain, Met
import ai.chronon.online.{Api, MetadataDirWalker, MetadataEndPoint, TopicChecker}
import ai.chronon.orchestration.{JoinMergeNode, JoinPartNode}
import ai.chronon.spark.batch._
import ai.chronon.spark.format.Format
import ai.chronon.spark.catalog.{Format, TableUtils}
import ai.chronon.spark.stats.drift.{Summarizer, SummaryPacker, SummaryUploader}
import ai.chronon.spark.stats.{CompareBaseJob, CompareJob, ConsistencyJob}
import ai.chronon.spark.streaming.JoinSourceRunner
Expand Down
1 change: 1 addition & 0 deletions spark/src/main/scala/ai/chronon/spark/Extensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{LongType, StructType}
import org.apache.spark.util.sketch.BloomFilter
import org.slf4j.{Logger, LoggerFactory}
import ai.chronon.spark.catalog.TableUtils

import java.util
import scala.collection.Seq
Expand Down
1 change: 1 addition & 0 deletions spark/src/main/scala/ai/chronon/spark/GroupBy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import ai.chronon.api.{
TsUtils,
TimeRange
}
import ai.chronon.spark.catalog.TableUtils
import ai.chronon.api.DataModel.ENTITIES
import ai.chronon.api.DataModel.EVENTS
import ai.chronon.api.Extensions._
Expand Down
1 change: 1 addition & 0 deletions spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import ai.chronon.aggregator.windowing.FiveMinuteResolution
import ai.chronon.aggregator.windowing.Resolution
import ai.chronon.aggregator.windowing.SawtoothOnlineAggregator
import ai.chronon.api
import ai.chronon.spark.catalog.TableUtils
import ai.chronon.api.Accuracy
import ai.chronon.api.Constants
import ai.chronon.api.DataModel
Expand Down
1 change: 1 addition & 0 deletions spark/src/main/scala/ai/chronon/spark/Join.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import ai.chronon.api.DataModel.ENTITIES
import ai.chronon.api.Extensions._
import ai.chronon.api.ScalaJavaConversions._
import ai.chronon.api._
import ai.chronon.spark.catalog.TableUtils
import ai.chronon.online.serde.SparkConversions
import ai.chronon.orchestration.{JoinBootstrapNode, JoinPartNode}
import ai.chronon.spark.Extensions._
Expand Down
1 change: 1 addition & 0 deletions spark/src/main/scala/ai/chronon/spark/JoinBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import ai.chronon.api
import ai.chronon.api.DataModel.ENTITIES
import ai.chronon.api.Extensions._
import ai.chronon.api.ScalaJavaConversions._
import ai.chronon.spark.catalog.TableUtils
import ai.chronon.api.{Accuracy, Constants, DateRange, JoinPart, PartitionRange, PartitionSpec}
import ai.chronon.online.metrics.Metrics
import ai.chronon.orchestration.JoinBootstrapNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ai.chronon.spark
import ai.chronon.api.Extensions._
import ai.chronon.api.ScalaJavaConversions.ListOps
import ai.chronon.api.DateRange
import ai.chronon.spark.catalog.TableUtils
import ai.chronon.orchestration.JoinDerivationNode
import ai.chronon.spark.Extensions._
import org.apache.spark.sql.functions.{coalesce, col, expr}
Expand Down
1 change: 1 addition & 0 deletions spark/src/main/scala/ai/chronon/spark/JoinUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import ai.chronon.api.ScalaJavaConversions._
import ai.chronon.api._
import ai.chronon.spark.Extensions._
import com.google.gson.Gson
import ai.chronon.spark.catalog.TableUtils
import org.apache.spark.sql
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.expressions.UserDefinedFunction
Expand Down
1 change: 1 addition & 0 deletions spark/src/main/scala/ai/chronon/spark/LabelJoin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import ai.chronon.api.{Builders, Constants, JoinPart, PartitionSpec, TimeUnit, W
import ai.chronon.api.DataModel.ENTITIES
import ai.chronon.api.DataModel.EVENTS
import ai.chronon.api.Extensions._
import ai.chronon.spark.catalog.TableUtils
import ai.chronon.api.PartitionRange
import ai.chronon.online.metrics.Metrics
import ai.chronon.spark.Extensions._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package ai.chronon.spark
import com.google.common.io.Files
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.SaveMode

import ai.chronon.spark.catalog.TableUtils
import java.io.File

object LocalTableExporter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import ai.chronon.online._
import ai.chronon.online.metrics._
import ai.chronon.online.serde._
import ai.chronon.spark.Extensions.{StructTypeOps, _}
import ai.chronon.spark.catalog.TableUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.functions.col
Expand Down
12 changes: 4 additions & 8 deletions spark/src/main/scala/ai/chronon/spark/MetadataExporter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,14 @@ package ai.chronon.spark

import ai.chronon.api
import ai.chronon.api.ThriftJsonCodec
import ai.chronon.spark.catalog.TableUtils
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.commons.lang.exception.ExceptionUtils
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.slf4j.{Logger, LoggerFactory}

import java.io.BufferedWriter
import java.io.File
import java.io.FileWriter
import java.nio.file.Files
import java.nio.file.Paths
import scala.collection.immutable.Map
import java.io.{BufferedWriter, File, FileWriter}
import java.nio.file.{Files, Paths}

object MetadataExporter {
@transient lazy val logger: Logger = LoggerFactory.getLogger(getClass)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ import ai.chronon.online.serde.SparkConversions
import ai.chronon.orchestration.JoinBootstrapNode
import ai.chronon.spark.Extensions._
import ai.chronon.spark.JoinUtils.{coalescedJoin, set_add}
import ai.chronon.spark.{BootstrapInfo, JoinUtils, TableUtils}
import ai.chronon.spark.{BootstrapInfo, JoinUtils}
import org.apache.spark.sql
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{coalesce, col, lit, typedLit}
import org.slf4j.{Logger, LoggerFactory}
import ai.chronon.spark.catalog.TableUtils

import scala.collection.Seq

Expand Down
6 changes: 2 additions & 4 deletions spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package ai.chronon.spark.batch

import ai.chronon.api
import ai.chronon.api.{Accuracy, Constants, DateRange, JoinPart, PartitionRange, PartitionSpec}
import ai.chronon.api.DataModel
import ai.chronon.api.DataModel.{ENTITIES, EVENTS}
import ai.chronon.api.Extensions.{DateRangeOps, DerivationOps, GroupByOps, JoinPartOps, MetadataOps}
import ai.chronon.api.PartitionRange.toTimeRange
import ai.chronon.api.{Accuracy, Builders, Constants, DateRange, JoinPart, PartitionRange}
import ai.chronon.online.metrics.Metrics
import ai.chronon.orchestration.JoinPartNode
import ai.chronon.spark.Extensions._
import ai.chronon.spark.{GroupBy, JoinUtils, TableUtils}
import ai.chronon.spark.catalog.TableUtils
import ai.chronon.spark.{GroupBy, JoinUtils}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, date_format}
import org.apache.spark.util.sketch.BloomFilter
Expand Down
4 changes: 2 additions & 2 deletions spark/src/main/scala/ai/chronon/spark/batch/LabelJoinV2.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import ai.chronon.api.PartitionRange.toTimeRange
import ai.chronon.api._
import ai.chronon.online.metrics.Metrics
import ai.chronon.online.serde.SparkConversions
import ai.chronon.spark.Analyzer
import ai.chronon.spark.Extensions._
import ai.chronon.spark.{GroupBy, JoinUtils, TableUtils}
import ai.chronon.spark.GroupBy
import ai.chronon.spark.catalog.TableUtils
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.types.{DataType, StructType}
Expand Down
3 changes: 2 additions & 1 deletion spark/src/main/scala/ai/chronon/spark/batch/MergeJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import ai.chronon.api.{Accuracy, Constants, DateRange, JoinPart, PartitionRange,
import ai.chronon.orchestration.JoinMergeNode
import ai.chronon.spark.Extensions._
import ai.chronon.spark.JoinUtils.coalescedJoin
import ai.chronon.spark.{JoinUtils, TableUtils}
import ai.chronon.spark.JoinUtils
import ai.chronon.spark.catalog.TableUtils
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, date_add, date_format, to_date}
import org.slf4j.{Logger, LoggerFactory}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import ai.chronon.api.ScalaJavaConversions.JListOps
import ai.chronon.orchestration.SourceWithFilterNode
import ai.chronon.spark.Extensions._
import ai.chronon.spark.JoinUtils.parseSkewKeys
import ai.chronon.spark.TableUtils
import ai.chronon.spark.catalog.TableUtils

import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import ai.chronon.api.ScalaJavaConversions._
import ai.chronon.api.thrift.TBase
import ai.chronon.api.{EngineType, ParametricMacro, PartitionRange, ThriftJsonCodec}
import ai.chronon.spark.Extensions._
import ai.chronon.spark.{SparkSessionBuilder, TableUtils}
import ai.chronon.spark.SparkSessionBuilder
import org.rogach.scallop.{ScallopConf, ScallopOption}
import org.slf4j.{Logger, LoggerFactory}
import ai.chronon.spark.catalog.TableUtils

import scala.collection.mutable
import scala.reflect.ClassTag
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package ai.chronon.spark.format
package ai.chronon.spark.catalog

import org.apache.spark.sql.types.StructType

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package ai.chronon.spark.format
package ai.chronon.spark.catalog

import org.apache.spark.sql.SparkSession
import org.slf4j.{Logger, LoggerFactory}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package ai.chronon.spark.format
package ai.chronon.spark.catalog

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.DeltaLog
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package ai.chronon.spark.format
package ai.chronon.spark.catalog

import org.apache.spark.sql.SparkSession
import org.slf4j.{Logger, LoggerFactory}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package ai.chronon.spark.format
package ai.chronon.spark.catalog

import org.apache.spark.sql.SparkSession

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package ai.chronon.spark.format
package ai.chronon.spark.catalog

import org.apache.spark.sql.SparkSession

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package ai.chronon.spark.format
package ai.chronon.spark.catalog

import ai.chronon.spark.TableUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, date_format}
import org.apache.spark.sql.types.StructType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,28 @@
* limitations under the License.
*/

package ai.chronon.spark
package ai.chronon.spark.catalog
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding a new catalog module.


import ai.chronon.api.ColorPrinter.ColorString
import ai.chronon.api.Extensions._
import ai.chronon.api.ScalaJavaConversions._
import ai.chronon.api.{Constants, PartitionRange, PartitionSpec, Query, QueryUtils, TsUtils}
import ai.chronon.spark.Extensions._
import ai.chronon.spark.format.CreationUtils.alterTablePropertiesSql
import ai.chronon.spark.format.{CreationUtils, FormatProvider, Iceberg}
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.plans.logical.{Filter, Project}
import org.apache.spark.sql.catalyst.util.QuotingUtils
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.slf4j.{Logger, LoggerFactory}

import java.io.{PrintWriter, StringWriter}
import java.time.{Instant, ZoneId}
import java.time.format.DateTimeFormatter
import java.time.{Instant, ZoneId}
import scala.collection.{Seq, mutable}
import scala.util.{Failure, Success, Try}
import org.apache.spark.sql.catalyst.util.QuotingUtils

/** Trait to track the table format in use by a Chronon dataset and some utility methods to help
* retrieve metadata / configure it appropriately at creation time
Expand Down Expand Up @@ -274,7 +272,7 @@ class TableUtils(@transient val sparkSession: SparkSession) extends Serializable

// Run tableProperties
Option(tableProperties).filter(_.nonEmpty).foreach { props =>
sql(alterTablePropertiesSql(tableName, props))
sql(CreationUtils.alterTablePropertiesSql(tableName, props))
}

val finalizedDf = if (autoExpand) {
Expand Down Expand Up @@ -564,7 +562,7 @@ class TableUtils(@transient val sparkSession: SparkSession) extends Serializable
sql(expandTableQueryOpt.get)

// set a flag in table props to indicate that this is a dynamic table
sql(alterTablePropertiesSql(tableName, Map(Constants.ChrononDynamicTable -> true.toString)))
sql(CreationUtils.alterTablePropertiesSql(tableName, Map(Constants.ChrononDynamicTable -> true.toString)))
}
}

Expand Down
Loading