@@ -38,6 +38,10 @@ case class GcpFormatProvider(sparkSession: SparkSession) extends FormatProvider
38
38
// Fixed to BigQuery for now.
39
39
override def writeFormat (tableName : String ): Format = {
40
40
41
+ val tableId = BigQueryUtil .parseTableId(tableName)
42
+ assert(Option (tableId.getProject).isDefined, s " project required for ${tableName}" )
43
+ assert(Option (tableId.getDataset).isDefined, s " dataset required for ${tableName}" )
44
+
41
45
val tu = TableUtils (sparkSession)
42
46
val partitionColumnOption =
43
47
if (tu.tableReachable(tableName)) Map .empty else Map (" partitionField" -> tu.partitionColumn)
@@ -49,7 +53,7 @@ case class GcpFormatProvider(sparkSession: SparkSession) extends FormatProvider
49
53
), // todo(tchow): No longer needed after https://github.com/GoogleCloudDataproc/spark-bigquery-connector/pull/1320
50
54
" writeMethod" -> " indirect"
51
55
) ++ partitionColumnOption
52
- BQuery (bqOptions.getProjectId , sparkOptions)
56
+ BQuery (tableId.getProject , sparkOptions)
53
57
}
54
58
55
59
private def format (tableName : String ): Format = {
@@ -82,7 +86,7 @@ case class GcpFormatProvider(sparkSession: SparkSession) extends FormatProvider
82
86
BQuery (table.getTableId.getProject, Map .empty)
83
87
} else throw new IllegalStateException (s " Cannot support table of type: ${table.getDefinition}" )
84
88
})
85
- .getOrElse(Hive )
89
+ .getOrElse(Option (btTableIdentifier.getProject).map( BQuery (_, Map .empty)).getOrElse( Hive ) )
86
90
87
91
/**
88
92
* Using federation
0 commit comments