|
33 | 33 | import com.google.cloud.bigquery.FieldList;
|
34 | 34 | import com.google.cloud.bigquery.FormatOptions;
|
35 | 35 | import com.google.cloud.bigquery.Job;
|
36 |
| -import com.google.cloud.bigquery.JobInfo; |
| 36 | +import com.google.cloud.bigquery.JobId; |
37 | 37 | import com.google.cloud.bigquery.LegacySQLTypeName;
|
38 |
| -import com.google.cloud.bigquery.LoadJobConfiguration; |
39 | 38 | import com.google.cloud.bigquery.Schema;
|
40 | 39 | import com.google.cloud.bigquery.StandardTableDefinition;
|
| 40 | +import com.google.cloud.bigquery.TableDataWriteChannel; |
41 | 41 | import com.google.cloud.bigquery.TableDefinition;
|
42 | 42 | import com.google.cloud.bigquery.TableId;
|
43 | 43 | import com.google.cloud.bigquery.TableInfo;
|
| 44 | +import com.google.cloud.bigquery.WriteChannelConfiguration; |
44 | 45 | import com.google.cloud.retail.v2.CreateProductRequest;
|
45 | 46 | import com.google.cloud.retail.v2.DeleteProductRequest;
|
46 | 47 | import com.google.cloud.retail.v2.FulfillmentInfo;
|
|
71 | 72 | import com.google.protobuf.Int32Value;
|
72 | 73 | import com.google.protobuf.Timestamp;
|
73 | 74 | import java.io.IOException;
|
| 75 | +import java.io.OutputStream; |
| 76 | +import java.nio.channels.Channels; |
74 | 77 | import java.nio.file.Files;
|
75 | 78 | import java.nio.file.Paths;
|
76 | 79 | import java.time.Instant;
|
@@ -349,27 +352,37 @@ public static void createBqTable(String datasetName, String tableName, Schema sc
|
349 | 352 | }
|
350 | 353 | }
|
351 | 354 |
|
352 |
| - public static void uploadDataToBqTable( |
353 |
| - String datasetName, String tableName, String sourceUri, Schema schema) { |
| 355 | + public static void uploadDataToBqTable(String datasetName, String tableName, String sourceUri) { |
354 | 356 | try {
|
355 | 357 | BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
|
356 | 358 | TableId tableId = TableId.of(datasetName, tableName);
|
357 |
| - LoadJobConfiguration loadConfig = |
358 |
| - LoadJobConfiguration.newBuilder(tableId, sourceUri) |
| 359 | + |
| 360 | + WriteChannelConfiguration writeChannelConfiguration = |
| 361 | + WriteChannelConfiguration.newBuilder(tableId) |
359 | 362 | .setFormatOptions(FormatOptions.json())
|
360 |
| - .setSchema(schema) |
361 | 363 | .build();
|
362 |
| - Job job = bigquery.create(JobInfo.of(loadConfig)); |
363 |
| - job = job.waitFor(); |
| 364 | + |
| 365 | + String jobName = "jobId_" + UUID.randomUUID(); |
| 366 | + JobId jobId = JobId.newBuilder().setLocation("us").setJob(jobName).build(); |
| 367 | + |
| 368 | + try (TableDataWriteChannel writer = bigquery.writer(jobId, writeChannelConfiguration); |
| 369 | + OutputStream stream = Channels.newOutputStream(writer)) { |
| 370 | + Files.copy(Paths.get(sourceUri), stream); |
| 371 | + } |
| 372 | + |
| 373 | + Job job = bigquery.getJob(jobId); |
| 374 | + Job completedJob = job.waitFor(); |
364 | 375 | if (job.isDone()) {
|
365 |
| - System.out.printf("Json from GCS successfully loaded in a table '%s'.%n", tableName); |
| 376 | + System.out.printf("Json successfully loaded in a table '%s'.%n", tableName); |
366 | 377 | } else {
|
367 | 378 | System.out.println(
|
368 | 379 | "BigQuery was unable to load into the table due to an error:"
|
369 | 380 | + job.getStatus().getError());
|
370 | 381 | }
|
371 | 382 | } catch (BigQueryException | InterruptedException e) {
|
372 | 383 | System.out.printf("Column not added during load append: %s%n", e.getMessage());
|
| 384 | + } catch (IOException e) { |
| 385 | + System.out.printf("Error copying file: %s%n", e.getMessage()); |
373 | 386 | }
|
374 | 387 | }
|
375 | 388 |
|
|
0 commit comments