9
9
import com .fasterxml .jackson .databind .JsonNode ;
10
10
import com .fasterxml .jackson .databind .node .ObjectNode ;
11
11
import com .google .auth .oauth2 .ServiceAccountCredentials ;
12
- import com .google .cloud .bigquery .*;
12
+ import com .google .cloud .bigquery .BigQuery ;
13
+ import com .google .cloud .bigquery .BigQueryOptions ;
14
+ import com .google .cloud .bigquery .ConnectionProperty ;
15
+ import com .google .cloud .bigquery .Dataset ;
16
+ import com .google .cloud .bigquery .DatasetInfo ;
17
+ import com .google .cloud .bigquery .Field ;
18
+ import com .google .cloud .bigquery .FieldList ;
19
+ import com .google .cloud .bigquery .FieldValue ;
20
+ import com .google .cloud .bigquery .FieldValueList ;
21
+ import com .google .cloud .bigquery .Job ;
22
+ import com .google .cloud .bigquery .JobId ;
23
+ import com .google .cloud .bigquery .JobInfo ;
24
+ import com .google .cloud .bigquery .QueryJobConfiguration ;
25
+ import com .google .cloud .bigquery .TableResult ;
13
26
import com .google .common .collect .ImmutableMap ;
14
- import io .airbyte .db .bigquery .BigQueryResultSet ;
15
- import io .airbyte .db .bigquery .BigQuerySourceOperations ;
16
27
import com .google .common .collect .Streams ;
17
28
import io .airbyte .commons .json .Jsons ;
18
29
import io .airbyte .commons .resources .MoreResources ;
19
30
import io .airbyte .commons .string .Strings ;
31
+ import io .airbyte .db .bigquery .BigQueryResultSet ;
32
+ import io .airbyte .db .bigquery .BigQuerySourceOperations ;
20
33
import io .airbyte .integrations .base .JavaBaseConstants ;
21
34
import io .airbyte .integrations .destination .NamingConventionTransformer ;
22
35
import io .airbyte .integrations .destination .StandardNameTransformer ;
33
46
import java .nio .charset .StandardCharsets ;
34
47
import java .nio .file .Files ;
35
48
import java .nio .file .Path ;
36
- import java .util .*;
49
+ import java .util .Collections ;
50
+ import java .util .List ;
51
+ import java .util .Optional ;
52
+ import java .util .TimeZone ;
53
+ import java .util .UUID ;
37
54
import java .util .stream .Collectors ;
38
55
import org .apache .commons .lang3 .tuple .ImmutablePair ;
39
56
import org .junit .jupiter .params .ParameterizedTest ;
@@ -118,8 +135,8 @@ protected boolean supportObjectDataTypeTest() {
118
135
119
136
@ Override
120
137
protected void assertNamespaceNormalization (final String testCaseId ,
121
- final String expectedNormalizedNamespace ,
122
- final String actualNormalizedNamespace ) {
138
+ final String expectedNormalizedNamespace ,
139
+ final String actualNormalizedNamespace ) {
123
140
final String message = String .format ("Test case %s failed; if this is expected, please override assertNamespaceNormalization" , testCaseId );
124
141
if (testCaseId .equals ("S3A-1" )) {
125
142
// bigquery allows namespace starting with a number, and prepending underscore
@@ -145,10 +162,10 @@ protected List<JsonNode> retrieveNormalizedRecords(final TestDestinationEnv test
145
162
146
163
@ Override
147
164
protected List <JsonNode > retrieveRecords (final TestDestinationEnv env ,
148
- final String streamName ,
149
- final String namespace ,
150
- final JsonNode streamSchema )
151
- throws Exception {
165
+ final String streamName ,
166
+ final String namespace ,
167
+ final JsonNode streamSchema )
168
+ throws Exception {
152
169
final String tableName = namingResolver .getIdentifier (streamName );
153
170
final String schema = namingResolver .getIdentifier (namespace );
154
171
return retrieveRecordsFromTable (tableName , schema );
@@ -158,20 +175,20 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
158
175
TimeZone .setDefault (TimeZone .getTimeZone ("UTC" ));
159
176
160
177
final QueryJobConfiguration queryConfig =
161
- QueryJobConfiguration
162
- .newBuilder (
163
- String .format ("SELECT * FROM `%s`.`%s` order by %s asc;" , schema , tableName ,
164
- JavaBaseConstants .COLUMN_NAME_EMITTED_AT ))
178
+ QueryJobConfiguration
179
+ .newBuilder (
180
+ String .format ("SELECT * FROM `%s`.`%s` order by %s asc;" , schema , tableName ,
181
+ JavaBaseConstants .COLUMN_NAME_EMITTED_AT ))
165
182
// .setUseLegacySql(false)
166
- .setConnectionProperties (Collections .singletonList (ConnectionProperty .of ("time_zone" , "UTC" )))
167
- .build ();
183
+ .setConnectionProperties (Collections .singletonList (ConnectionProperty .of ("time_zone" , "UTC" )))
184
+ .build ();
168
185
169
186
final TableResult queryResults = executeQuery (bigquery , queryConfig ).getLeft ().getQueryResults ();
170
187
final FieldList fields = queryResults .getSchema ().getFields ();
171
188
BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations ();
172
189
173
190
return Streams .stream (queryResults .iterateAll ())
174
- .map (fieldValues -> sourceOperations .rowToJson (new BigQueryResultSet (fieldValues , fields ))).collect (Collectors .toList ());
191
+ .map (fieldValues -> sourceOperations .rowToJson (new BigQueryResultSet (fieldValues , fields ))).collect (Collectors .toList ());
175
192
}
176
193
177
194
private boolean isAirbyteColumn (final String name ) {
@@ -294,9 +311,8 @@ private static Job waitForQuery(final Job queryJob) {
294
311
}
295
312
296
313
/**
297
- * Verify that the integration successfully writes normalized records successfully (without actually
298
- * running the normalization module) Tests a wide variety of messages an schemas (aspirationally,
299
- * anyway).
314
+ * Verify that the integration successfully writes normalized records successfully (without actually running the normalization module) Tests a wide
315
+ * variety of messages an schemas (aspirationally, anyway).
300
316
*/
301
317
@ ParameterizedTest
302
318
@ ArgumentsSource (DataArgumentsProvider .class )
0 commit comments