11
11
import com .fasterxml .jackson .databind .node .ObjectNode ;
12
12
import com .google .api .gax .rpc .HeaderProvider ;
13
13
import com .google .cloud .bigquery .BigQuery ;
14
+ import com .google .cloud .bigquery .BigQueryError ;
14
15
import com .google .cloud .bigquery .BigQueryException ;
15
16
import com .google .cloud .bigquery .Clustering ;
16
17
import com .google .cloud .bigquery .Dataset ;
17
18
import com .google .cloud .bigquery .DatasetInfo ;
18
19
import com .google .cloud .bigquery .Field ;
19
20
import com .google .cloud .bigquery .FieldList ;
21
+ import com .google .cloud .bigquery .InsertAllRequest ;
22
+ import com .google .cloud .bigquery .InsertAllResponse ;
20
23
import com .google .cloud .bigquery .Job ;
21
24
import com .google .cloud .bigquery .JobId ;
22
25
import com .google .cloud .bigquery .JobInfo ;
25
28
import com .google .cloud .bigquery .Schema ;
26
29
import com .google .cloud .bigquery .StandardSQLTypeName ;
27
30
import com .google .cloud .bigquery .StandardTableDefinition ;
31
+ import com .google .cloud .bigquery .Table ;
28
32
import com .google .cloud .bigquery .TableDefinition ;
29
33
import com .google .cloud .bigquery .TableId ;
30
34
import com .google .cloud .bigquery .TableInfo ;
31
35
import com .google .cloud .bigquery .TimePartitioning ;
32
36
import com .google .common .collect .ImmutableList ;
33
37
import com .google .common .collect .ImmutableMap ;
38
+ import io .airbyte .commons .exceptions .ConfigErrorException ;
34
39
import io .airbyte .commons .json .Jsons ;
35
40
import io .airbyte .config .WorkerEnvConstants ;
36
41
import io .airbyte .integrations .base .JavaBaseConstants ;
45
50
import java .time .format .DateTimeParseException ;
46
51
import java .util .ArrayList ;
47
52
import java .util .List ;
53
+ import java .util .Map ;
48
54
import java .util .Optional ;
49
55
import java .util .Set ;
50
56
import java .util .UUID ;
@@ -64,7 +70,8 @@ public class BigQueryUtils {
64
70
DateTimeFormatter .ofPattern ("[yyyy][yy]['-']['/']['.'][' '][MMM][MM][M]['-']['/']['.'][' '][dd][d]" +
65
71
"[[' ']['T']HH:mm[':'ss[.][SSSSSS][SSSSS][SSSS][SSS][' '][z][zzz][Z][O][x][XXX][XX][X]]]" );
66
72
private static final String USER_AGENT_FORMAT = "%s (GPN: Airbyte)" ;
67
- private static final String CHECK_TEST_DATASET_SUFFIX = "_airbyte_check_stage_tmp" ;
73
+ private static final String CHECK_TEST_DATASET_SUFFIX = "_airbyte_check_stage_tmp_" + System .currentTimeMillis ();
74
+ private static final String CHECK_TEST_TMP_TABLE_NAME = "test_connection_table_name" ;
68
75
69
76
public static ImmutablePair <Job , String > executeQuery (final BigQuery bigquery , final QueryJobConfiguration queryConfig ) {
70
77
final JobId jobId = JobId .of (UUID .randomUUID ().toString ());
@@ -119,16 +126,67 @@ public static Dataset getOrCreateDataset(final BigQuery bigquery, final String d
119
126
120
127
public static void checkHasCreateAndDeleteDatasetRole (final BigQuery bigquery , final String datasetId , final String datasetLocation ) {
121
128
final String tmpTestDatasetId = datasetId + CHECK_TEST_DATASET_SUFFIX ;
122
- final Dataset dataset = bigquery .getDataset (tmpTestDatasetId );
129
+ final DatasetInfo datasetInfo = DatasetInfo .newBuilder (tmpTestDatasetId ).setLocation (datasetLocation ).build ();
130
+
131
+ bigquery .create (datasetInfo );
123
132
124
- // remove possible tmp datasets from previous execution
125
- if (dataset != null && dataset .exists ()) {
133
+ try {
134
+ attemptCreateTableAndTestInsert (bigquery , tmpTestDatasetId );
135
+ } finally {
126
136
bigquery .delete (tmpTestDatasetId );
127
137
}
138
+ }
128
139
129
- final DatasetInfo datasetInfo = DatasetInfo .newBuilder (tmpTestDatasetId ).setLocation (datasetLocation ).build ();
130
- bigquery .create (datasetInfo );
131
- bigquery .delete (tmpTestDatasetId );
140
+ /**
141
+ * Method is used to create tmp table and make dummy record insert. It's used in Check() connection
142
+ * method to make sure that user has all required roles for upcoming data sync/migration. It also
143
+ * verifies if BigQuery project is billable, if not - later sync will fail as non-billable project
144
+ * has limitations with stream uploading and DML queries. More details may be found there:
145
+ * https://cloud.google.com/bigquery/docs/streaming-data-into-bigquery
146
+ * https://cloud.google.com/bigquery/docs/reference/standard-sql/data-manipulation-language
147
+ *
148
+ * @param bigquery - initialized bigquery client
149
+ * @param tmpTestDatasetId - dataset name where tmp table will be created
150
+ */
151
+ private static void attemptCreateTableAndTestInsert (final BigQuery bigquery , final String tmpTestDatasetId ) {
152
+ // Create dummy schema that will be used for tmp table creation
153
+ final Schema testTableSchema = Schema .of (
154
+ Field .of ("id" , StandardSQLTypeName .INT64 ),
155
+ Field .of ("name" , StandardSQLTypeName .STRING ));
156
+
157
+ // Create tmp table to verify if user has a create table permission. Also below we will do test
158
+ // records insert in it
159
+ final Table test_connection_table_name = createTable (bigquery , tmpTestDatasetId ,
160
+ CHECK_TEST_TMP_TABLE_NAME , testTableSchema );
161
+
162
+ // Try to make test (dummy records) insert to make sure that user has required permissions
163
+ try {
164
+ final InsertAllResponse response =
165
+ bigquery .insertAll (InsertAllRequest
166
+ .newBuilder (test_connection_table_name )
167
+ .addRow (Map .of ("id" , 1 , "name" , "James" ))
168
+ .addRow (Map .of ("id" , 2 , "name" , "Eugene" ))
169
+ .addRow (Map .of ("id" , 3 , "name" , "Angelina" ))
170
+ .build ());
171
+
172
+ if (response .hasErrors ()) {
173
+ // If any of the insertions failed, this lets you inspect the errors
174
+ for (Map .Entry <Long , List <BigQueryError >> entry : response .getInsertErrors ().entrySet ()) {
175
+ throw new ConfigErrorException ("Failed to check connection: \n " + entry .getValue ());
176
+ }
177
+ }
178
+ } catch (final BigQueryException e ) {
179
+ throw new ConfigErrorException ("Failed to check connection: \n " + e .getMessage ());
180
+ } finally {
181
+ test_connection_table_name .delete ();
182
+ }
183
+ }
184
+
185
+ public static Table createTable (final BigQuery bigquery , String datasetName , String tableName , Schema schema ) {
186
+ final TableId tableId = TableId .of (datasetName , tableName );
187
+ final TableDefinition tableDefinition = StandardTableDefinition .of (schema );
188
+ final TableInfo tableInfo = TableInfo .newBuilder (tableId , tableDefinition ).build ();
189
+ return bigquery .create (tableInfo );
132
190
}
133
191
134
192
// https://cloud.google.com/bigquery/docs/creating-partitioned-tables#java
0 commit comments