|
1 | 1 | package ai.chronon.integrations.cloud_gcp
|
2 | 2 |
|
3 |
| -import ai.chronon.api.ScalaJavaConversions.ListOps |
4 | 3 | import ai.chronon.spark.TableUtils
|
5 | 4 | import ai.chronon.spark.format.Format
|
6 | 5 | import ai.chronon.spark.format.FormatProvider
|
7 | 6 | import ai.chronon.spark.format.Hive
|
8 |
| -import com.google.cloud.bigquery._ |
| 7 | +import com.google.cloud.bigquery.BigQuery |
| 8 | +import com.google.cloud.bigquery.BigQueryOptions |
| 9 | +import com.google.cloud.bigquery.ExternalTableDefinition |
| 10 | +import com.google.cloud.bigquery.FormatOptions |
| 11 | +import com.google.cloud.bigquery.StandardTableDefinition |
| 12 | +import com.google.cloud.bigquery.Table |
| 13 | +import com.google.cloud.bigquery.TableDefinition |
9 | 14 | import com.google.cloud.bigquery.connector.common.BigQueryUtil
|
10 | 15 | import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.TableId
|
11 | 16 | import org.apache.spark.sql.SparkSession
|
@@ -49,19 +54,21 @@ case class GcpFormatProvider(sparkSession: SparkSession) extends FormatProvider
|
49 | 54 | BigQueryFormat(tableId.getProject, sparkOptions)
|
50 | 55 | }
|
51 | 56 |
|
52 |
| - private def getFormat(table: Table): Format = |
| 57 | + private[cloud_gcp] def getFormat(table: Table): Format = |
53 | 58 | table.getDefinition.asInstanceOf[TableDefinition] match {
|
54 | 59 |
|
55 | 60 | case definition: ExternalTableDefinition =>
|
56 |
| - val uris = definition.getSourceUris.toScala |
57 |
| - .map(uri => uri.stripSuffix("/*") + "/") |
58 |
| - |
59 |
| - assert(uris.length == 1, s"External table ${table.getFriendlyName} can be backed by only one URI.") |
60 |
| - |
61 | 61 | val formatOptions = definition.getFormatOptions
|
62 | 62 | .asInstanceOf[FormatOptions]
|
63 |
| - |
64 |
| - GCS(table.getTableId.getProject, uris.head, formatOptions.getType) |
| 63 | + val externalTable = table.getDefinition.asInstanceOf[ExternalTableDefinition] |
| 64 | + val uri = Option(externalTable.getHivePartitioningOptions) |
| 65 | + .map(_.getSourceUriPrefix) |
| 66 | + .getOrElse { |
| 67 | + val uris = externalTable.getSourceUris |
| 68 | + require(uris.size == 1, s"External table ${table} can be backed by only one URI.") |
| 69 | + uris.get(0).replaceAll("/\\*\\.parquet$", "") |
| 70 | + } |
| 71 | + GCS(table.getTableId.getProject, uri, formatOptions.getType) |
65 | 72 |
|
66 | 73 | case _: StandardTableDefinition =>
|
67 | 74 | BigQueryFormat(table.getTableId.getProject, Map.empty)
|
|
0 commit comments