Skip to content

Cache table permission check #377

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package ai.chronon.integrations.cloud_gcp

import ai.chronon.spark.TableUtils
import ai.chronon.spark.TableUtils.TableCreationStatus
import ai.chronon.spark.format.Format
import com.google.cloud.bigquery.connector.common.BigQueryUtil
import com.google.cloud.spark.bigquery.SchemaConverters
Expand All @@ -26,13 +28,14 @@ case class BigQueryFormat(project: String, bqClient: BigQuery, override val opti
override def primaryPartitions(tableName: String, partitionColumn: String, subPartitionsFilter: Map[String, String])(
implicit sparkSession: SparkSession): Seq[String] =
super.primaryPartitions(tableName, partitionColumn, subPartitionsFilter)
override def createTable(df: DataFrame,
tableName: String,
partitionColumns: Seq[String],
tableProperties: Map[String, String],
fileFormat: String): (String => Unit) => Unit = {
override def generateTableBuilder(df: DataFrame,
tableName: String,
partitionColumns: Seq[String],
tableProperties: Map[String, String],
fileFormat: String): (String => Unit) => TableCreationStatus = {

def inner(df: DataFrame, tableName: String, partitionColumns: Seq[String])(sqlEvaluator: String => Unit) = {
def inner(df: DataFrame, tableName: String, partitionColumns: Seq[String])(
sqlEvaluator: String => Unit): TableCreationStatus = {

// See: https://cloud.google.com/bigquery/docs/partitioned-tables#limitations
// "BigQuery does not support partitioning by multiple columns. Only one column can be used to partition a table."
Expand All @@ -57,8 +60,8 @@ case class BigQueryFormat(project: String, bqClient: BigQuery, override val opti
val tableInfoBuilder = TableInfo.newBuilder(shadedTableId, tableDefinition.build)

val tableInfo = tableInfoBuilder.build

bqClient.create(tableInfo)
TableUtils.TableCreatedWithoutInitialData
}

inner(df, tableName, partitionColumns)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,33 @@
package ai.chronon.integrations.cloud_gcp

import ai.chronon.api.Extensions.StringOps
import ai.chronon.api.ScalaJavaConversions.JListOps
import ai.chronon.spark.TableUtils
import ai.chronon.spark.TableUtils.{TableCreatedWithInitialData, TableCreationStatus}
import ai.chronon.spark.format.Format
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import com.google.cloud.bigquery.connector.common.BigQueryUtil
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.{
BigQuery,
BigQueryOptions,
ExternalTableDefinition,
FormatOptions,
HivePartitioningOptions,
TableInfo
}
import com.google.cloud.spark.bigquery.{SchemaConverters, SchemaConvertersConfiguration}
import org.apache.spark.sql.{DataFrame, Encoders, Row, SparkSession}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.slf4j.LoggerFactory

case class GCS(sourceUri: String, fileFormat: String) extends Format {

private lazy val logger = LoggerFactory.getLogger(this.getClass.getName)

private lazy val bqOptions = BigQueryOptions.getDefaultInstance
lazy val bigQueryClient: BigQuery = bqOptions.getService

override def name: String = fileFormat

override def primaryPartitions(tableName: String, partitionColumn: String, subPartitionsFilter: Map[String, String])(
Expand Down Expand Up @@ -72,6 +90,62 @@ case class GCS(sourceUri: String, fileFormat: String) extends Format {
}.toMap)
}

override def generateTableBuilder(df: DataFrame,
tableName: String,
partitionColumns: Seq[String],
tableProperties: Map[String, String],
fileFormat: String): (String => Unit) => TableCreationStatus = {

def inner(df: DataFrame, tableName: String, partitionColumns: Seq[String])(sqlEvaluator: String => Unit) = {

// See: https://cloud.google.com/bigquery/docs/partitioned-tables#limitations
// "BigQuery does not support partitioning by multiple columns. Only one column can be used to partition a table."
require(partitionColumns.size < 2,
s"BigQuery only supports at most one partition column, incoming spec: ${partitionColumns}")

val shadedTableId = BigQueryUtil.parseTableId(tableName)

val writePrefix = TableUtils(df.sparkSession).writePrefix
require(writePrefix.nonEmpty, "Please set conf 'spark.chronon.table_write.prefix' pointing to a data bucket.")

val path = writePrefix.get + tableName.sanitize + "/" //split("/").map(_.sanitize).mkString("/")
val dataGlob = path + "*"

logger.info(s"""
|table source uri: $dataGlob
|partition uri: $path
|""".stripMargin)

df.write
.partitionBy(partitionColumns: _*)
.mode("overwrite") // or "append" based on your needs
.parquet(path)

val baseTableDef = ExternalTableDefinition
.newBuilder(dataGlob, FormatOptions.parquet())
.setAutodetect(true)

if (partitionColumns.nonEmpty) {
val timePartitioning = HivePartitioningOptions
.newBuilder()
.setFields(partitionColumns.toJava)
.setSourceUriPrefix(path)
.setMode("STRINGS")
.build()
baseTableDef.setHivePartitioningOptions(timePartitioning)
}

val tableInfo = TableInfo.newBuilder(shadedTableId, baseTableDef.build).build()
val createdTable = bigQueryClient.create(tableInfo)

println(s"Created external table ${createdTable.getTableId}")

TableCreatedWithInitialData
}

inner(df, tableName, partitionColumns)
}

def createTableTypeString: String = throw new UnsupportedOperationException("GCS does not support create table")

def fileFormatString(format: String): String = ""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,9 @@
package ai.chronon.integrations.cloud_gcp
import ai.chronon.spark.format.Format
import ai.chronon.spark.format.FormatProvider
import ai.chronon.api.Extensions.StringOps
import ai.chronon.spark.TableUtils
import ai.chronon.spark.format.{Format, FormatProvider}
import com.google.cloud.bigquery.connector.common.BigQueryUtil
import com.google.cloud.spark.bigquery.SparkBigQueryConfig
import com.google.cloud.spark.bigquery.SparkBigQueryConfig.IntermediateFormat
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQuery
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryOptions
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.ExternalTableDefinition
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.FormatOptions
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.JobInfo
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.StandardTableDefinition
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Table
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.TableDefinition
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.TableId
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery._
import org.apache.spark.sql.SparkSession

import scala.jdk.CollectionConverters._
Expand All @@ -38,24 +29,14 @@ case class GcpFormatProvider(sparkSession: SparkSession) extends FormatProvider
}
.getOrElse(tableName)

override def readFormat(tableName: String): Option[Format] = format(tableName)
override def readFormat(tableName: String): scala.Option[Format] = format(tableName)

override def writeFormat(table: String): Format = {
val tableId = BigQueryUtil.parseTableId(table)
assert(scala.Option(tableId.getProject).isDefined, s"project required for ${table}")
assert(scala.Option(tableId.getDataset).isDefined, s"dataset required for ${table}")
val writePrefix = TableUtils(sparkSession).writePrefix
require(writePrefix.nonEmpty, "Please set conf 'spark.chronon.table_write.prefix' pointing to a data bucket.")

val sparkOptions: Map[String, String] = Map(
"temporaryGcsBucket" -> sparkSession.conf.get("spark.chronon.table.gcs.temporary_gcs_bucket"),
"writeMethod" -> "indirect",
SparkBigQueryConfig.INTERMEDIATE_FORMAT_OPTION -> IntermediateFormat.PARQUET.getDataSource,
SparkBigQueryConfig.ENABLE_LIST_INFERENCE -> true.toString,
"materializationProject" -> tableId.getProject,
"materializationDataset" -> tableId.getDataset,
"createDisposition" -> JobInfo.CreateDisposition.CREATE_NEVER.name
)

BigQueryFormat(tableId.getProject, bigQueryClient, sparkOptions)
val path = writePrefix.get + table.sanitize //split("/").map(_.sanitize).mkString("/")
GCS(path, "PARQUET")
}

private[cloud_gcp] def getFormat(table: Table): Format =
Expand All @@ -65,7 +46,8 @@ case class GcpFormatProvider(sparkSession: SparkSession) extends FormatProvider
val formatOptions = definition.getFormatOptions
.asInstanceOf[FormatOptions]
val externalTable = table.getDefinition.asInstanceOf[ExternalTableDefinition]
val uri = Option(externalTable.getHivePartitioningOptions)
val uri = scala
.Option(externalTable.getHivePartitioningOptions)
.map(_.getSourceUriPrefix)
.getOrElse {
val uris = externalTable.getSourceUris.asScala
Expand All @@ -81,10 +63,10 @@ case class GcpFormatProvider(sparkSession: SparkSession) extends FormatProvider
case _ => throw new IllegalStateException(s"Cannot support table of type: ${table.getFriendlyName}")
}

private def format(tableName: String): Option[Format] = {
private def format(tableName: String): scala.Option[Format] = {

val btTableIdentifier: TableId = BigQueryUtil.parseTableId(tableName)
val table = Option(bigQueryClient.getTable(btTableIdentifier.getDataset, btTableIdentifier.getTable))
val table = scala.Option(bigQueryClient.getTable(btTableIdentifier.getDataset, btTableIdentifier.getTable))
table
.map(getFormat)

Expand Down
2 changes: 2 additions & 0 deletions spark/src/main/scala/ai/chronon/spark/Extensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ object Extensions {
case "hive" | "delta" | "iceberg" =>
optionDfw
.format(normalized)
.partitionBy(null: _*)
.insertInto(dataPointer.tableOrPath)
case _ =>
throw new UnsupportedOperationException(s"Unsupported write catalog: ${normalized}")
Expand All @@ -335,6 +336,7 @@ object Extensions {
// None case is just table against default catalog
optionDfw
.format("hive")
.partitionBy(null: _*)
.insertInto(dataPointer.tableOrPath))
}
}
Expand Down
Loading
Loading