From 10afcef7ac7fe5b24938cc549b95f82bc03ee383 Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Mon, 14 Nov 2016 14:24:53 +0100 Subject: [PATCH] Make TableDataWriteChannel expose Job when upload completed --- .../cloud/bigquery/TableDataWriteChannel.java | 73 +++++++++++++++---- .../cloud/bigquery/spi/BigQueryRpc.java | 7 +- .../bigquery/spi/DefaultBigQueryRpc.java | 11 ++- .../bigquery/TableDataWriteChannelTest.java | 63 ++++++++++++---- .../cloud/bigquery/it/ITBigQueryTest.java | 25 ++++--- .../com/google/cloud/BaseWriteChannel.java | 11 ++- .../bigquery/snippets/BigQuerySnippets.java | 12 +-- .../bigquery/snippets/ITBigQuerySnippets.java | 13 ++-- 8 files changed, 154 insertions(+), 61 deletions(-) diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/TableDataWriteChannel.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/TableDataWriteChannel.java index 8c96f4a587d4..34f027f3ced6 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/TableDataWriteChannel.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/TableDataWriteChannel.java @@ -17,19 +17,25 @@ package com.google.cloud.bigquery; import static com.google.cloud.RetryHelper.runWithRetries; -import static java.util.concurrent.Executors.callable; import com.google.cloud.BaseWriteChannel; import com.google.cloud.RestorableState; import com.google.cloud.RetryHelper; import com.google.cloud.WriteChannel; +import com.google.common.base.MoreObjects; +import java.util.Objects; import java.util.concurrent.Callable; /** - * WriteChannel implementation to stream data into a BigQuery table. + * {@link WriteChannel} implementation to stream data into a BigQuery table. Use {@link #getJob()} + * to get the job used to insert streamed data. Please notice that {@link #getJob()} returns + * {@code null} until the channel is closed. */ -class TableDataWriteChannel extends BaseWriteChannel { +public class TableDataWriteChannel extends + BaseWriteChannel { + + private Job job; TableDataWriteChannel(BigQueryOptions options, WriteChannelConfiguration writeChannelConfiguration) { @@ -44,12 +50,15 @@ class TableDataWriteChannel extends BaseWriteChannel() { + @Override + public com.google.api.services.bigquery.model.Job call() { + return getOptions().getRpc().write( + getUploadId(), getBuffer(), 0, getPosition(), length, last); + } + }, getOptions().getRetryParams(), BigQueryImpl.EXCEPTION_HANDLER, getOptions().getClock()); + job = jobPb != null ? Job.fromPb(getOptions().getService(), jobPb) : null; } catch (RetryHelper.RetryHelperException e) { throw BigQueryException.translateAndThrow(e); } @@ -57,7 +66,7 @@ public void run() { @Override protected StateImpl.Builder stateBuilder() { - return StateImpl.builder(getOptions(), getEntity(), getUploadId()); + return StateImpl.builder(getOptions(), getEntity(), getUploadId(), job); } private static String open(final BigQueryOptions options, @@ -74,21 +83,39 @@ public String call() { } } + private void setJob(Job job) { + this.job = job; + } + + /** + * Returns the {@code Job} created to insert the rows. The job is available only once the upload + * finished and the channel was closed, returns {@code null} otherwise. + */ + public Job getJob() { + return job; + } + static class StateImpl extends BaseWriteChannel.BaseState { - private static final long serialVersionUID = -787362105981823738L; + private static final long serialVersionUID = -2692851818766876346L; + + private final Job job; StateImpl(Builder builder) { super(builder); + this.job = builder.job; } static class Builder extends BaseWriteChannel.BaseState.Builder { + private final Job job; + private Builder(BigQueryOptions options, WriteChannelConfiguration configuration, - String uploadId) { + String uploadId, Job job) { super(options, configuration, uploadId); + this.job = job; } public RestorableState build() { @@ -97,15 +124,33 @@ public RestorableState build() { } static Builder builder(BigQueryOptions options, WriteChannelConfiguration config, - String uploadId) { - return new Builder(options, config, uploadId); + String uploadId, Job job) { + return new Builder(options, config, uploadId, job); } @Override public WriteChannel restore() { TableDataWriteChannel channel = new TableDataWriteChannel(serviceOptions, entity, uploadId); channel.restore(this); + channel.setJob(job); return channel; } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), job); + } + + @Override + public boolean equals(Object obj) { + return super.equals(obj) + && obj instanceof StateImpl + && Objects.equals(job, ((StateImpl) obj).job); + } + + @Override + protected MoreObjects.ToStringHelper toStringHelper() { + return super.toStringHelper().add("job", job); + } } } diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/BigQueryRpc.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/BigQueryRpc.java index 01bf226b844e..5bb53a6fd667 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/BigQueryRpc.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/BigQueryRpc.java @@ -242,7 +242,8 @@ Tuple> listTableData(String projectId, String dataset String open(JobConfiguration configuration); /** - * Uploads the provided data to the resumable upload session at the specified position. + * Uploads the provided data to the resumable upload session at the specified position. This + * method returns the job created to insert the rows, only when {@code last} is {@code true}. * * @param uploadId the resumable upload session URI * @param toWrite a byte array of data to upload @@ -250,8 +251,10 @@ Tuple> listTableData(String projectId, String dataset * @param destOffset offset in the destination where to upload data to * @param length the number of bytes to upload * @param last {@code true} indicates that the last chunk is being uploaded + * @return returns the job created to insert the rows, only when {@code last} is {@code true}. + * Returns {@code null} otherwise * @throws BigQueryException upon failure */ - void write(String uploadId, byte[] toWrite, int toWriteOffset, long destOffset, int length, + Job write(String uploadId, byte[] toWrite, int toWriteOffset, long destOffset, int length, boolean last); } diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/DefaultBigQueryRpc.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/DefaultBigQueryRpc.java index 01d64b09f7e4..247c9e0db861 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/DefaultBigQueryRpc.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/DefaultBigQueryRpc.java @@ -432,12 +432,13 @@ public String open(JobConfiguration configuration) { } @Override - public void write(String uploadId, byte[] toWrite, int toWriteOffset, long destOffset, int length, + public Job write(String uploadId, byte[] toWrite, int toWriteOffset, long destOffset, int length, boolean last) { try { GenericUrl url = new GenericUrl(uploadId); - HttpRequest httpRequest = bigquery.getRequestFactory().buildPutRequest(url, - new ByteArrayContent(null, toWrite, toWriteOffset, length)); + HttpRequest httpRequest = bigquery.getRequestFactory() + .buildPutRequest(url, new ByteArrayContent(null, toWrite, toWriteOffset, length)); + httpRequest.setParser(bigquery.getObjectParser()); long limit = destOffset + length; StringBuilder range = new StringBuilder("bytes "); range.append(destOffset).append('-').append(limit - 1).append('/'); @@ -450,8 +451,9 @@ public void write(String uploadId, byte[] toWrite, int toWriteOffset, long destO int code; String message; IOException exception = null; + HttpResponse response = null; try { - HttpResponse response = httpRequest.execute(); + response = httpRequest.execute(); code = response.getStatusCode(); message = response.getStatusMessage(); } catch (HttpResponseException ex) { @@ -466,6 +468,7 @@ public void write(String uploadId, byte[] toWrite, int toWriteOffset, long destO } throw new BigQueryException(code, message); } + return last && response != null ? response.parseAs(Job.class) : null; } catch (IOException ex) { throw translate(ex); } diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/TableDataWriteChannelTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/TableDataWriteChannelTest.java index f5bf37a5bb94..55ee78173217 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/TableDataWriteChannelTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/TableDataWriteChannelTest.java @@ -22,11 +22,11 @@ import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -65,6 +65,9 @@ public class TableDataWriteChannelTest { private static final int DEFAULT_CHUNK_SIZE = 8 * MIN_CHUNK_SIZE; private static final int CUSTOM_CHUNK_SIZE = 4 * MIN_CHUNK_SIZE; private static final Random RANDOM = new Random(); + private static final LoadJobConfiguration JOB_CONFIGURATION = + LoadJobConfiguration.of(TABLE_ID, "URI"); + private static final JobInfo JOB_INFO = JobInfo.of(JOB_CONFIGURATION); @Rule public ExpectedException thrown = ExpectedException.none(); @@ -72,23 +75,35 @@ public class TableDataWriteChannelTest { private BigQueryOptions options; private BigQueryRpcFactory rpcFactoryMock; private BigQueryRpc bigqueryRpcMock; + private BigQueryFactory bigqueryFactoryMock; + private BigQuery bigqueryMock; + private Job job; + private TableDataWriteChannel writer; @Before public void setUp() { rpcFactoryMock = createMock(BigQueryRpcFactory.class); bigqueryRpcMock = createMock(BigQueryRpc.class); + bigqueryFactoryMock = createMock(BigQueryFactory.class); + bigqueryMock = createMock(BigQuery.class); + expect(bigqueryMock.getOptions()).andReturn(options).anyTimes(); + replay(bigqueryMock); + job = new Job(bigqueryMock, new JobInfo.BuilderImpl(JOB_INFO)); expect(rpcFactoryMock.create(anyObject(BigQueryOptions.class))).andReturn(bigqueryRpcMock); - replay(rpcFactoryMock); + expect(bigqueryFactoryMock.create(anyObject(BigQueryOptions.class))).andReturn(bigqueryMock) + .anyTimes(); + replay(rpcFactoryMock, bigqueryFactoryMock); options = BigQueryOptions.newBuilder() .setProjectId("projectid") .setServiceRpcFactory(rpcFactoryMock) + .setServiceFactory(bigqueryFactoryMock) .build(); } @After public void tearDown() throws Exception { - verify(rpcFactoryMock, bigqueryRpcMock); + verify(rpcFactoryMock, bigqueryRpcMock, bigqueryFactoryMock, bigqueryMock); } @Test @@ -97,6 +112,7 @@ public void testCreate() { replay(bigqueryRpcMock); writer = new TableDataWriteChannel(options, LOAD_CONFIGURATION); assertTrue(writer.isOpen()); + assertNull(writer.getJob()); } @Test @@ -107,6 +123,7 @@ public void testCreateRetryableError() { replay(bigqueryRpcMock); writer = new TableDataWriteChannel(options, LOAD_CONFIGURATION); assertTrue(writer.isOpen()); + assertNull(writer.getJob()); } @Test @@ -123,28 +140,30 @@ public void testWriteWithoutFlush() throws IOException { replay(bigqueryRpcMock); writer = new TableDataWriteChannel(options, LOAD_CONFIGURATION); assertEquals(MIN_CHUNK_SIZE, writer.write(ByteBuffer.allocate(MIN_CHUNK_SIZE))); + assertNull(writer.getJob()); } @Test public void testWriteWithFlush() throws IOException { expect(bigqueryRpcMock.open(LOAD_CONFIGURATION.toPb())).andReturn(UPLOAD_ID); Capture capturedBuffer = Capture.newInstance(); - bigqueryRpcMock.write(eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), - eq(CUSTOM_CHUNK_SIZE), eq(false)); + expect(bigqueryRpcMock.write(eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), + eq(CUSTOM_CHUNK_SIZE), eq(false))).andReturn(null); replay(bigqueryRpcMock); writer = new TableDataWriteChannel(options, LOAD_CONFIGURATION); writer.setChunkSize(CUSTOM_CHUNK_SIZE); ByteBuffer buffer = randomBuffer(CUSTOM_CHUNK_SIZE); assertEquals(CUSTOM_CHUNK_SIZE, writer.write(buffer)); assertArrayEquals(buffer.array(), capturedBuffer.getValue()); + assertNull(writer.getJob()); } @Test public void testWritesAndFlush() throws IOException { expect(bigqueryRpcMock.open(LOAD_CONFIGURATION.toPb())).andReturn(UPLOAD_ID); Capture capturedBuffer = Capture.newInstance(); - bigqueryRpcMock.write(eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), - eq(DEFAULT_CHUNK_SIZE), eq(false)); + expect(bigqueryRpcMock.write(eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), + eq(DEFAULT_CHUNK_SIZE), eq(false))).andReturn(null); replay(bigqueryRpcMock); writer = new TableDataWriteChannel(options, LOAD_CONFIGURATION); ByteBuffer[] buffers = new ByteBuffer[DEFAULT_CHUNK_SIZE / MIN_CHUNK_SIZE]; @@ -158,19 +177,23 @@ public void testWritesAndFlush() throws IOException { Arrays.copyOfRange( capturedBuffer.getValue(), MIN_CHUNK_SIZE * i, MIN_CHUNK_SIZE * (i + 1))); } + assertNull(writer.getJob()); } @Test public void testCloseWithoutFlush() throws IOException { expect(bigqueryRpcMock.open(LOAD_CONFIGURATION.toPb())).andReturn(UPLOAD_ID); Capture capturedBuffer = Capture.newInstance(); - bigqueryRpcMock.write(eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(0), eq(true)); + expect(bigqueryRpcMock.write( + eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(0), eq(true))) + .andReturn(job.toPb()); replay(bigqueryRpcMock); writer = new TableDataWriteChannel(options, LOAD_CONFIGURATION); assertTrue(writer.isOpen()); writer.close(); assertArrayEquals(new byte[0], capturedBuffer.getValue()); assertTrue(!writer.isOpen()); + assertEquals(job, writer.getJob()); } @Test @@ -178,8 +201,9 @@ public void testCloseWithFlush() throws IOException { expect(bigqueryRpcMock.open(LOAD_CONFIGURATION.toPb())).andReturn(UPLOAD_ID); Capture capturedBuffer = Capture.newInstance(); ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE); - bigqueryRpcMock.write(eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(MIN_CHUNK_SIZE), - eq(true)); + expect(bigqueryRpcMock.write( + eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(MIN_CHUNK_SIZE), eq(true))) + .andReturn(job.toPb()); replay(bigqueryRpcMock); writer = new TableDataWriteChannel(options, LOAD_CONFIGURATION); assertTrue(writer.isOpen()); @@ -188,16 +212,20 @@ public void testCloseWithFlush() throws IOException { assertEquals(DEFAULT_CHUNK_SIZE, capturedBuffer.getValue().length); assertArrayEquals(buffer.array(), Arrays.copyOf(capturedBuffer.getValue(), MIN_CHUNK_SIZE)); assertTrue(!writer.isOpen()); + assertEquals(job, writer.getJob()); } @Test public void testWriteClosed() throws IOException { expect(bigqueryRpcMock.open(LOAD_CONFIGURATION.toPb())).andReturn(UPLOAD_ID); Capture capturedBuffer = Capture.newInstance(); - bigqueryRpcMock.write(eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(0), eq(true)); + expect(bigqueryRpcMock.write( + eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(0), eq(true))) + .andReturn(job.toPb()); replay(bigqueryRpcMock); writer = new TableDataWriteChannel(options, LOAD_CONFIGURATION); writer.close(); + assertEquals(job, writer.getJob()); try { writer.write(ByteBuffer.allocate(MIN_CHUNK_SIZE)); fail("Expected TableDataWriteChannel write to throw IOException"); @@ -211,9 +239,8 @@ public void testSaveAndRestore() throws IOException { expect(bigqueryRpcMock.open(LOAD_CONFIGURATION.toPb())).andReturn(UPLOAD_ID); Capture capturedBuffer = Capture.newInstance(CaptureType.ALL); Capture capturedPosition = Capture.newInstance(CaptureType.ALL); - bigqueryRpcMock.write(eq(UPLOAD_ID), capture(capturedBuffer), eq(0), - captureLong(capturedPosition), eq(DEFAULT_CHUNK_SIZE), eq(false)); - expectLastCall().times(2); + expect(bigqueryRpcMock.write(eq(UPLOAD_ID), capture(capturedBuffer), eq(0), + captureLong(capturedPosition), eq(DEFAULT_CHUNK_SIZE), eq(false))).andReturn(null).times(2); replay(bigqueryRpcMock); ByteBuffer buffer1 = randomBuffer(DEFAULT_CHUNK_SIZE); ByteBuffer buffer2 = randomBuffer(DEFAULT_CHUNK_SIZE); @@ -221,6 +248,7 @@ public void testSaveAndRestore() throws IOException { assertEquals(DEFAULT_CHUNK_SIZE, writer.write(buffer1)); assertArrayEquals(buffer1.array(), capturedBuffer.getValues().get(0)); assertEquals(new Long(0L), capturedPosition.getValues().get(0)); + assertNull(writer.getJob()); RestorableState writerState = writer.capture(); WriteChannel restoredWriter = writerState.restore(); assertEquals(DEFAULT_CHUNK_SIZE, restoredWriter.write(buffer2)); @@ -232,13 +260,16 @@ public void testSaveAndRestore() throws IOException { public void testSaveAndRestoreClosed() throws IOException { expect(bigqueryRpcMock.open(LOAD_CONFIGURATION.toPb())).andReturn(UPLOAD_ID); Capture capturedBuffer = Capture.newInstance(); - bigqueryRpcMock.write(eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(0), eq(true)); + expect(bigqueryRpcMock.write( + eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(0), eq(true))) + .andReturn(job.toPb()); replay(bigqueryRpcMock); writer = new TableDataWriteChannel(options, LOAD_CONFIGURATION); writer.close(); + assertEquals(job, writer.getJob()); RestorableState writerState = writer.capture(); RestorableState expectedWriterState = - TableDataWriteChannel.StateImpl.builder(options, LOAD_CONFIGURATION, UPLOAD_ID) + TableDataWriteChannel.StateImpl.builder(options, LOAD_CONFIGURATION, UPLOAD_ID, job) .setBuffer(null) .setChunkSize(DEFAULT_CHUNK_SIZE) .setIsOpen(false) diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java index 9722b44a0ddd..084a9be94c3a 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java @@ -25,7 +25,6 @@ import static org.junit.Assert.fail; import com.google.cloud.Page; -import com.google.cloud.WriteChannel; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQuery.DatasetField; import com.google.cloud.bigquery.BigQuery.DatasetOption; @@ -50,6 +49,7 @@ import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.JobInfo; import com.google.cloud.bigquery.JobStatistics; +import com.google.cloud.bigquery.JobStatistics.LoadStatistics; import com.google.cloud.bigquery.LoadJobConfiguration; import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.bigquery.QueryRequest; @@ -57,6 +57,7 @@ import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableDataWriteChannel; import com.google.cloud.bigquery.TableDefinition; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.TableInfo; @@ -180,6 +181,7 @@ public class ITBigQueryTest { + "\"BytesField\": \"" + BYTES_BASE64 + "\"" + "}" + "}"; + private static final Set PUBLIC_DATASETS = ImmutableSet.of("github_repos", "hacker_news", "noaa_gsod", "samples", "usa_names"); @@ -967,7 +969,7 @@ public void testCancelNonExistingJob() { } @Test - public void testInsertFromFile() throws InterruptedException { + public void testInsertFromFile() throws InterruptedException, IOException, TimeoutException { String destinationTableName = "test_insert_from_file_table"; TableId tableId = TableId.of(DATASET, destinationTableName); WriteChannelConfiguration configuration = WriteChannelConfiguration.newBuilder(tableId) @@ -975,15 +977,20 @@ public void testInsertFromFile() throws InterruptedException { .setCreateDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED) .setSchema(TABLE_SCHEMA) .build(); - try (WriteChannel channel = bigquery.writer(configuration)) { + TableDataWriteChannel channel = bigquery.writer(configuration); + try { channel.write(ByteBuffer.wrap(JSON_CONTENT.getBytes(StandardCharsets.UTF_8))); - } catch (IOException e) { - fail("IOException was not expected"); - } - // wait until the new table is created. If the table is never created the test will time-out - while (bigquery.getTable(tableId) == null) { - Thread.sleep(1000L); + } finally { + channel.close(); } + Job job = channel.getJob().waitFor(); + LoadStatistics statistics = job.getStatistics(); + assertEquals(1L, statistics.getInputFiles().longValue()); + assertEquals(2L, statistics.getOutputRows().longValue()); + LoadJobConfiguration jobConfiguration = job.getConfiguration(); + assertEquals(TABLE_SCHEMA, jobConfiguration.getSchema()); + assertNull(jobConfiguration.getSourceUris()); + assertNull(job.getStatus().getError()); Page> rows = bigquery.listTableData(tableId); int rowCount = 0; for (List row : rows.getValues()) { diff --git a/google-cloud-core/src/main/java/com/google/cloud/BaseWriteChannel.java b/google-cloud-core/src/main/java/com/google/cloud/BaseWriteChannel.java index 4d01a60e4422..f7a67deece57 100644 --- a/google-cloud-core/src/main/java/com/google/cloud/BaseWriteChannel.java +++ b/google-cloud-core/src/main/java/com/google/cloud/BaseWriteChannel.java @@ -352,14 +352,17 @@ public boolean equals(Object obj) { && this.chunkSize == other.chunkSize; } - @Override - public String toString() { + protected MoreObjects.ToStringHelper toStringHelper() { return MoreObjects.toStringHelper(this) .add("entity", entity) .add("uploadId", uploadId) .add("position", position) - .add("isOpen", isOpen) - .toString(); + .add("isOpen", isOpen); + } + + @Override + public String toString() { + return toStringHelper().toString(); } } } diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/bigquery/snippets/BigQuerySnippets.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/bigquery/snippets/BigQuerySnippets.java index dce4ac19a8bc..85d556861a51 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/bigquery/snippets/BigQuerySnippets.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/bigquery/snippets/BigQuerySnippets.java @@ -23,7 +23,6 @@ package com.google.cloud.examples.bigquery.snippets; import com.google.api.client.util.Charsets; -import com.google.cloud.BaseWriteChannel; import com.google.cloud.Page; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQuery.DatasetDeleteOption; @@ -33,7 +32,6 @@ import com.google.cloud.bigquery.BigQuery.TableListOption; import com.google.cloud.bigquery.BigQueryError; import com.google.cloud.bigquery.BigQueryException; -import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.Dataset; import com.google.cloud.bigquery.DatasetId; import com.google.cloud.bigquery.DatasetInfo; @@ -53,6 +51,7 @@ import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableDataWriteChannel; import com.google.cloud.bigquery.TableDefinition; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.TableInfo; @@ -332,16 +331,15 @@ public Table getTableFromId(String projectId, String datasetName, String tableNa // [VARIABLE "my_dataset_name"] // [VARIABLE "my_table_name"] // [VARIABLE "StringValue1\nStringValue2\n"] - public BaseWriteChannel writeToTable( - String datasetName, String tableName, String csvData) throws IOException { + public TableDataWriteChannel writeToTable(String datasetName, String tableName, String csvData) + throws IOException { // [START writeToTable] TableId tableId = TableId.of(datasetName, tableName); WriteChannelConfiguration writeChannelConfiguration = WriteChannelConfiguration.newBuilder(tableId) .setFormatOptions(FormatOptions.csv()) .build(); - BaseWriteChannel writer = - bigquery.writer(writeChannelConfiguration); + TableDataWriteChannel writer = bigquery.writer(writeChannelConfiguration); // Write data to writer try { writer.write(ByteBuffer.wrap(csvData.getBytes(Charsets.UTF_8))); @@ -349,6 +347,8 @@ public BaseWriteChannel writeToTable // Unable to write data } writer.close(); + // Get load job + Job job = writer.getJob(); // [END writeToTable] return writer; } diff --git a/google-cloud-examples/src/test/java/com/google/cloud/examples/bigquery/snippets/ITBigQuerySnippets.java b/google-cloud-examples/src/test/java/com/google/cloud/examples/bigquery/snippets/ITBigQuerySnippets.java index df7811963c6a..38eaab709f04 100644 --- a/google-cloud-examples/src/test/java/com/google/cloud/examples/bigquery/snippets/ITBigQuerySnippets.java +++ b/google-cloud-examples/src/test/java/com/google/cloud/examples/bigquery/snippets/ITBigQuerySnippets.java @@ -39,6 +39,7 @@ import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableDataWriteChannel; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.TableInfo; import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; @@ -57,6 +58,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; public class ITBigQuerySnippets { @@ -170,16 +172,15 @@ public void testCreateGetAndDeleteDataset() throws InterruptedException { } @Test - public void testWriteAndListTableData() throws IOException, InterruptedException { + public void testWriteAndListTableData() + throws IOException, InterruptedException, TimeoutException { String tableName = "test_write_and_list_table_data"; String fieldName = "string_field"; assertNotNull(bigquerySnippets.createTable(DATASET, tableName, fieldName)); - bigquerySnippets.writeToTable(DATASET, tableName, "StringValue1\nStringValue2\n"); + TableDataWriteChannel channel = + bigquerySnippets.writeToTable(DATASET, tableName, "StringValue1\nStringValue2\n"); + channel.getJob().waitFor(); Page> listPage = bigquerySnippets.listTableData(DATASET, tableName); - while (Iterators.size(listPage.iterateAll()) < 2) { - Thread.sleep(500); - listPage = bigquerySnippets.listTableData(DATASET, tableName); - } Iterator> rowIterator = listPage.getValues().iterator(); assertEquals("StringValue1", rowIterator.next().get(0).getStringValue()); assertEquals("StringValue2", rowIterator.next().get(0).getStringValue());