Skip to content

Make TableDataWriteChannel expose Job when upload completed #1395

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,25 @@
package com.google.cloud.bigquery;

import static com.google.cloud.RetryHelper.runWithRetries;
import static java.util.concurrent.Executors.callable;

import com.google.cloud.BaseWriteChannel;
import com.google.cloud.RestorableState;
import com.google.cloud.RetryHelper;
import com.google.cloud.WriteChannel;
import com.google.common.base.MoreObjects;

import java.util.Objects;
import java.util.concurrent.Callable;

/**
* WriteChannel implementation to stream data into a BigQuery table.
* {@link WriteChannel} implementation to stream data into a BigQuery table. Use {@link #getJob()}
* to get the job used to insert streamed data. Please notice that {@link #getJob()} returns
* {@code null} until the channel is closed.
*/
class TableDataWriteChannel extends BaseWriteChannel<BigQueryOptions, WriteChannelConfiguration> {
public class TableDataWriteChannel extends
BaseWriteChannel<BigQueryOptions, WriteChannelConfiguration> {

private Job job;

TableDataWriteChannel(BigQueryOptions options,
WriteChannelConfiguration writeChannelConfiguration) {
Expand All @@ -44,20 +50,23 @@ class TableDataWriteChannel extends BaseWriteChannel<BigQueryOptions, WriteChann
@Override
protected void flushBuffer(final int length, final boolean last) {
try {
runWithRetries(callable(new Runnable() {
@Override
public void run() {
getOptions().getRpc().write(getUploadId(), getBuffer(), 0, getPosition(), length, last);
}
}), getOptions().getRetryParams(), BigQueryImpl.EXCEPTION_HANDLER, getOptions().getClock());
com.google.api.services.bigquery.model.Job jobPb = runWithRetries(
new Callable<com.google.api.services.bigquery.model.Job>() {
@Override
public com.google.api.services.bigquery.model.Job call() {
return getOptions().getRpc().write(
getUploadId(), getBuffer(), 0, getPosition(), length, last);
}
}, getOptions().getRetryParams(), BigQueryImpl.EXCEPTION_HANDLER, getOptions().getClock());
job = jobPb != null ? Job.fromPb(getOptions().getService(), jobPb) : null;
} catch (RetryHelper.RetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}
}

@Override
protected StateImpl.Builder stateBuilder() {
return StateImpl.builder(getOptions(), getEntity(), getUploadId());
return StateImpl.builder(getOptions(), getEntity(), getUploadId(), job);
}

private static String open(final BigQueryOptions options,
Expand All @@ -74,21 +83,39 @@ public String call() {
}
}

private void setJob(Job job) {
this.job = job;
}

/**
* Returns the {@code Job} created to insert the rows. The job is available only once the upload
* finished and the channel was closed, returns {@code null} otherwise.
*/
public Job getJob() {
return job;
}

static class StateImpl
extends BaseWriteChannel.BaseState<BigQueryOptions, WriteChannelConfiguration> {

private static final long serialVersionUID = -787362105981823738L;
private static final long serialVersionUID = -2692851818766876346L;

private final Job job;

StateImpl(Builder builder) {
super(builder);
this.job = builder.job;
}

static class Builder
extends BaseWriteChannel.BaseState.Builder<BigQueryOptions, WriteChannelConfiguration> {

private final Job job;

private Builder(BigQueryOptions options, WriteChannelConfiguration configuration,
String uploadId) {
String uploadId, Job job) {
super(options, configuration, uploadId);
this.job = job;
}

public RestorableState<WriteChannel> build() {
Expand All @@ -97,15 +124,33 @@ public RestorableState<WriteChannel> build() {
}

static Builder builder(BigQueryOptions options, WriteChannelConfiguration config,
String uploadId) {
return new Builder(options, config, uploadId);
String uploadId, Job job) {
return new Builder(options, config, uploadId, job);
}

@Override
public WriteChannel restore() {
TableDataWriteChannel channel = new TableDataWriteChannel(serviceOptions, entity, uploadId);
channel.restore(this);
channel.setJob(job);
return channel;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), job);
}

@Override
public boolean equals(Object obj) {
return super.equals(obj)
&& obj instanceof StateImpl
&& Objects.equals(job, ((StateImpl) obj).job);
}

@Override
protected MoreObjects.ToStringHelper toStringHelper() {
return super.toStringHelper().add("job", job);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -242,16 +242,19 @@ Tuple<String, Iterable<TableRow>> listTableData(String projectId, String dataset
String open(JobConfiguration configuration);

/**
* Uploads the provided data to the resumable upload session at the specified position.
* Uploads the provided data to the resumable upload session at the specified position. This
* method returns the job created to insert the rows, only when {@code last} is {@code true}.
*
* @param uploadId the resumable upload session URI
* @param toWrite a byte array of data to upload
* @param toWriteOffset offset in the {@code toWrite} param to start writing from
* @param destOffset offset in the destination where to upload data to
* @param length the number of bytes to upload
* @param last {@code true} indicates that the last chunk is being uploaded
* @return returns the job created to insert the rows, only when {@code last} is {@code true}.
* Returns {@code null} otherwise
* @throws BigQueryException upon failure
*/
void write(String uploadId, byte[] toWrite, int toWriteOffset, long destOffset, int length,
Job write(String uploadId, byte[] toWrite, int toWriteOffset, long destOffset, int length,
boolean last);
}
Original file line number Diff line number Diff line change
Expand Up @@ -432,12 +432,13 @@ public String open(JobConfiguration configuration) {
}

@Override
public void write(String uploadId, byte[] toWrite, int toWriteOffset, long destOffset, int length,
public Job write(String uploadId, byte[] toWrite, int toWriteOffset, long destOffset, int length,
boolean last) {
try {
GenericUrl url = new GenericUrl(uploadId);
HttpRequest httpRequest = bigquery.getRequestFactory().buildPutRequest(url,
new ByteArrayContent(null, toWrite, toWriteOffset, length));
HttpRequest httpRequest = bigquery.getRequestFactory()
.buildPutRequest(url, new ByteArrayContent(null, toWrite, toWriteOffset, length));
httpRequest.setParser(bigquery.getObjectParser());
long limit = destOffset + length;
StringBuilder range = new StringBuilder("bytes ");
range.append(destOffset).append('-').append(limit - 1).append('/');
Expand All @@ -450,8 +451,9 @@ public void write(String uploadId, byte[] toWrite, int toWriteOffset, long destO
int code;
String message;
IOException exception = null;
HttpResponse response = null;
try {
HttpResponse response = httpRequest.execute();
response = httpRequest.execute();
code = response.getStatusCode();
message = response.getStatusMessage();
} catch (HttpResponseException ex) {
Expand All @@ -466,6 +468,7 @@ public void write(String uploadId, byte[] toWrite, int toWriteOffset, long destO
}
throw new BigQueryException(code, message);
}
return last && response != null ? response.parseAs(Job.class) : null;
} catch (IOException ex) {
throw translate(ex);
}
Expand Down
Loading