Skip to content

Commit 1f91ae7

Browse files
feat: Add ability to specify RetryOptions and BigQueryRetryConfig when create job and waitFor (#3398)
* feat: initial implementation of customizable BigQueryRetryConfig * Add unit and integration tests * Fix lint issues * Revert unintentional changes to testQueryStatistics in ITBigQueryTest * Revert unintentional changes to testQueryStatistics in ITBigQueryTest * Revert unintentional changes to testQueryStatistics in ITBigQueryTest * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Add additional comments to Job.waitFor() for BigQueryRetryConfig --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 95c8d6f commit 1f91ae7

File tree

7 files changed

+370
-21
lines changed

7 files changed

+370
-21
lines changed

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQuery.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,7 @@ public static JobListOption fields(JobField... fields) {
561561
/** Class for specifying table get and create options. */
562562
class JobOption extends Option {
563563

564-
private static final long serialVersionUID = -3111736712316353665L;
564+
private static final long serialVersionUID = -3111736712316353664L;
565565

566566
private JobOption(BigQueryRpc.Option option, Object value) {
567567
super(option, value);
@@ -578,6 +578,16 @@ public static JobOption fields(JobField... fields) {
578578
return new JobOption(
579579
BigQueryRpc.Option.FIELDS, Helper.selector(JobField.REQUIRED_FIELDS, fields));
580580
}
581+
582+
/** Returns an option to specify the job's BigQuery retry configuration. */
583+
public static JobOption bigQueryRetryConfig(BigQueryRetryConfig bigQueryRetryConfig) {
584+
return new JobOption(BigQueryRpc.Option.BIGQUERY_RETRY_CONFIG, bigQueryRetryConfig);
585+
}
586+
587+
/** Returns an option to specify the job's retry options. */
588+
public static JobOption retryOptions(RetryOption... options) {
589+
return new JobOption(BigQueryRpc.Option.RETRY_OPTIONS, options);
590+
}
581591
}
582592

583593
/** Class for specifying query results options. */

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java

+17-2
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import com.google.cloud.Policy;
4040
import com.google.cloud.RetryHelper;
4141
import com.google.cloud.RetryHelper.RetryHelperException;
42+
import com.google.cloud.RetryOption;
4243
import com.google.cloud.Tuple;
4344
import com.google.cloud.bigquery.InsertAllRequest.RowToInsert;
4445
import com.google.cloud.bigquery.QueryJobConfiguration.JobCreationMode;
@@ -415,10 +416,15 @@ public com.google.api.services.bigquery.model.Job call() {
415416
}
416417
}
417418
},
418-
getOptions().getRetrySettings(),
419+
getRetryOptions(optionsMap) != null
420+
? RetryOption.mergeToSettings(
421+
getOptions().getRetrySettings(), getRetryOptions(optionsMap))
422+
: getOptions().getRetrySettings(),
419423
BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER,
420424
getOptions().getClock(),
421-
DEFAULT_RETRY_CONFIG));
425+
getBigQueryRetryConfig(optionsMap) != null
426+
? getBigQueryRetryConfig(optionsMap)
427+
: DEFAULT_RETRY_CONFIG));
422428
} catch (BigQueryRetryHelper.BigQueryRetryHelperException e) {
423429
throw BigQueryException.translateAndThrow(e);
424430
}
@@ -1628,4 +1634,13 @@ public com.google.api.services.bigquery.model.TestIamPermissionsResponse call()
16281634
}
16291635
return optionMap;
16301636
}
1637+
1638+
static BigQueryRetryConfig getBigQueryRetryConfig(Map<BigQueryRpc.Option, ?> options) {
1639+
return (BigQueryRetryConfig)
1640+
options.getOrDefault(BigQueryRpc.Option.BIGQUERY_RETRY_CONFIG, null);
1641+
}
1642+
1643+
static RetryOption[] getRetryOptions(Map<BigQueryRpc.Option, ?> options) {
1644+
return (RetryOption[]) options.getOrDefault(BigQueryRpc.Option.RETRY_OPTIONS, null);
1645+
}
16311646
}

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java

+46-5
Original file line numberDiff line numberDiff line change
@@ -196,12 +196,21 @@ public boolean isDone() {
196196
Job job = bigquery.getJob(getJobId(), JobOption.fields(BigQuery.JobField.STATUS));
197197
return job == null || JobStatus.State.DONE.equals(job.getStatus().getState());
198198
}
199+
200+
/** See {@link #waitFor(BigQueryRetryConfig, RetryOption...)} */
201+
public Job waitFor(RetryOption... waitOptions) throws InterruptedException {
202+
return waitForInternal(DEFAULT_RETRY_CONFIG, waitOptions);
203+
}
204+
199205
/**
200206
* Blocks until this job completes its execution, either failing or succeeding. This method
201207
* returns current job's latest information. If the job no longer exists, this method returns
202208
* {@code null}. By default, the job status is checked using jittered exponential backoff with 1
203209
* second as an initial delay, 2.0 as a backoff factor, 1 minute as maximum delay between polls,
204-
* 12 hours as a total timeout and unlimited number of attempts.
210+
* 12 hours as a total timeout and unlimited number of attempts. For query jobs, the job status
211+
* check can be configured to retry on specific BigQuery error messages using {@link
212+
* BigQueryRetryConfig}. This {@link BigQueryRetryConfig} configuration is not available for
213+
* non-query jobs.
205214
*
206215
* <p>Example usage of {@code waitFor()}.
207216
*
@@ -232,18 +241,46 @@ public boolean isDone() {
232241
* }
233242
* }</pre>
234243
*
244+
* <p>Example usage of {@code waitFor()} with BigQuery retry configuration to retry on rate limit
245+
* exceeded error messages for query jobs.
246+
*
247+
* <pre>{@code
248+
* Job completedJob =
249+
* job.waitFor(
250+
* BigQueryRetryConfig.newBuilder()
251+
* .retryOnMessage(BigQueryErrorMessages.RATE_LIMIT_EXCEEDED_MSG)
252+
* .retryOnMessage(BigQueryErrorMessages.JOB_RATE_LIMIT_EXCEEDED_MSG)
253+
* .retryOnRegEx(BigQueryErrorMessages.RetryRegExPatterns.RATE_LIMIT_EXCEEDED_REGEX)
254+
* .build());
255+
* if (completedJob == null) {
256+
* // job no longer exists
257+
* } else if (completedJob.getStatus().getError() != null) {
258+
* // job failed, handle error
259+
* } else {
260+
* // job completed successfully
261+
* }
262+
* }</pre>
263+
*
264+
* @param bigQueryRetryConfig configures retries for query jobs for BigQuery failures
235265
* @param waitOptions options to configure checking period and timeout
236266
* @throws BigQueryException upon failure, check {@link BigQueryException#getCause()} for details
237267
* @throws InterruptedException if the current thread gets interrupted while waiting for the job
238268
* to complete
239269
*/
240-
public Job waitFor(RetryOption... waitOptions) throws InterruptedException {
270+
public Job waitFor(BigQueryRetryConfig bigQueryRetryConfig, RetryOption... waitOptions)
271+
throws InterruptedException {
272+
return waitForInternal(bigQueryRetryConfig, waitOptions);
273+
}
274+
275+
private Job waitForInternal(BigQueryRetryConfig bigQueryRetryConfig, RetryOption... waitOptions)
276+
throws InterruptedException {
241277
checkNotDryRun("waitFor");
242278
Object completedJobResponse;
243279
if (getConfiguration().getType() == Type.QUERY) {
244280
completedJobResponse =
245281
waitForQueryResults(
246282
RetryOption.mergeToSettings(DEFAULT_JOB_WAIT_SETTINGS, waitOptions),
283+
bigQueryRetryConfig,
247284
DEFAULT_QUERY_WAIT_OPTIONS);
248285
} else {
249286
completedJobResponse =
@@ -294,7 +331,9 @@ public TableResult getQueryResults(QueryResultsOption... options)
294331

295332
QueryResponse response =
296333
waitForQueryResults(
297-
DEFAULT_JOB_WAIT_SETTINGS, waitOptions.toArray(new QueryResultsOption[0]));
334+
DEFAULT_JOB_WAIT_SETTINGS,
335+
DEFAULT_RETRY_CONFIG,
336+
waitOptions.toArray(new QueryResultsOption[0]));
298337

299338
// Get the job resource to determine if it has errored.
300339
Job job = this;
@@ -334,7 +373,9 @@ public TableResult getQueryResults(QueryResultsOption... options)
334373
}
335374

336375
private QueryResponse waitForQueryResults(
337-
RetrySettings retrySettings, final QueryResultsOption... resultsOptions)
376+
RetrySettings retrySettings,
377+
BigQueryRetryConfig bigQueryRetryConfig,
378+
final QueryResultsOption... resultsOptions)
338379
throws InterruptedException {
339380
if (getConfiguration().getType() != Type.QUERY) {
340381
throw new UnsupportedOperationException(
@@ -360,7 +401,7 @@ public boolean shouldRetry(
360401
}
361402
},
362403
options.getClock(),
363-
DEFAULT_RETRY_CONFIG);
404+
bigQueryRetryConfig);
364405
} catch (BigQueryRetryHelper.BigQueryRetryHelperException e) {
365406
throw BigQueryException.translateAndThrow(e);
366407
}

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/BigQueryRpc.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@ enum Option {
5757
STATE_FILTER("stateFilter"),
5858
TIMEOUT("timeoutMs"),
5959
REQUESTED_POLICY_VERSION("requestedPolicyVersion"),
60-
TABLE_METADATA_VIEW("view");
60+
TABLE_METADATA_VIEW("view"),
61+
RETRY_OPTIONS("retryOptions"),
62+
BIGQUERY_RETRY_CONFIG("bigQueryRetryConfig");
6163

6264
private final String value;
6365

google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java

+114
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.google.api.services.bigquery.model.*;
2828
import com.google.api.services.bigquery.model.JobStatistics;
2929
import com.google.cloud.Policy;
30+
import com.google.cloud.RetryOption;
3031
import com.google.cloud.ServiceOptions;
3132
import com.google.cloud.Tuple;
3233
import com.google.cloud.bigquery.BigQuery.JobOption;
@@ -1594,6 +1595,119 @@ public void testCreateJobFailureShouldRetry() {
15941595
verify(bigqueryRpcMock, times(6)).create(jobCapture.capture(), eq(EMPTY_RPC_OPTIONS));
15951596
}
15961597

1598+
@Test
1599+
public void testCreateJobWithBigQueryRetryConfigFailureShouldRetry() {
1600+
// Validate create job with BigQueryRetryConfig that retries on rate limit error message.
1601+
JobOption bigQueryRetryConfigOption =
1602+
JobOption.bigQueryRetryConfig(
1603+
BigQueryRetryConfig.newBuilder()
1604+
.retryOnMessage(BigQueryErrorMessages.RATE_LIMIT_EXCEEDED_MSG)
1605+
.retryOnMessage(BigQueryErrorMessages.JOB_RATE_LIMIT_EXCEEDED_MSG)
1606+
.retryOnRegEx(BigQueryErrorMessages.RetryRegExPatterns.RATE_LIMIT_EXCEEDED_REGEX)
1607+
.build());
1608+
1609+
Map<BigQueryRpc.Option, ?> bigQueryRpcOptions = optionMap(bigQueryRetryConfigOption);
1610+
when(bigqueryRpcMock.create(jobCapture.capture(), eq(bigQueryRpcOptions)))
1611+
.thenThrow(
1612+
new BigQueryException(
1613+
400, RATE_LIMIT_ERROR_MSG)) // retrial on based on RATE_LIMIT_EXCEEDED_MSG
1614+
.thenThrow(new BigQueryException(200, RATE_LIMIT_ERROR_MSG))
1615+
.thenReturn(newJobPb());
1616+
1617+
bigquery = options.getService();
1618+
bigquery =
1619+
options
1620+
.toBuilder()
1621+
.setRetrySettings(ServiceOptions.getDefaultRetrySettings())
1622+
.build()
1623+
.getService();
1624+
1625+
((BigQueryImpl) bigquery)
1626+
.create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), bigQueryRetryConfigOption);
1627+
verify(bigqueryRpcMock, times(3)).create(jobCapture.capture(), eq(bigQueryRpcOptions));
1628+
}
1629+
1630+
@Test
1631+
public void testCreateJobWithBigQueryRetryConfigFailureShouldNotRetry() {
1632+
// Validate create job with BigQueryRetryConfig that does not retry on rate limit error message.
1633+
JobOption bigQueryRetryConfigOption =
1634+
JobOption.bigQueryRetryConfig(BigQueryRetryConfig.newBuilder().build());
1635+
1636+
Map<BigQueryRpc.Option, ?> bigQueryRpcOptions = optionMap(bigQueryRetryConfigOption);
1637+
when(bigqueryRpcMock.create(jobCapture.capture(), eq(bigQueryRpcOptions)))
1638+
.thenThrow(new BigQueryException(400, RATE_LIMIT_ERROR_MSG));
1639+
1640+
bigquery = options.getService();
1641+
bigquery =
1642+
options
1643+
.toBuilder()
1644+
.setRetrySettings(ServiceOptions.getDefaultRetrySettings())
1645+
.build()
1646+
.getService();
1647+
1648+
try {
1649+
((BigQueryImpl) bigquery)
1650+
.create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), bigQueryRetryConfigOption);
1651+
fail("JobException expected");
1652+
} catch (BigQueryException e) {
1653+
assertNotNull(e.getMessage());
1654+
}
1655+
// Verify that getQueryResults is attempted only once and not retried since the error message
1656+
// does not match.
1657+
verify(bigqueryRpcMock, times(1)).create(jobCapture.capture(), eq(bigQueryRpcOptions));
1658+
}
1659+
1660+
@Test
1661+
public void testCreateJobWithRetryOptionsFailureShouldRetry() {
1662+
// Validate create job with RetryOptions.
1663+
JobOption retryOptions = JobOption.retryOptions(RetryOption.maxAttempts(4));
1664+
Map<BigQueryRpc.Option, ?> bigQueryRpcOptions = optionMap(retryOptions);
1665+
when(bigqueryRpcMock.create(jobCapture.capture(), eq(bigQueryRpcOptions)))
1666+
.thenThrow(new BigQueryException(500, "InternalError"))
1667+
.thenThrow(new BigQueryException(502, "Bad Gateway"))
1668+
.thenThrow(new BigQueryException(503, "Service Unavailable"))
1669+
.thenReturn(newJobPb());
1670+
1671+
bigquery = options.getService();
1672+
bigquery =
1673+
options
1674+
.toBuilder()
1675+
.setRetrySettings(ServiceOptions.getDefaultRetrySettings())
1676+
.build()
1677+
.getService();
1678+
1679+
((BigQueryImpl) bigquery)
1680+
.create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), retryOptions);
1681+
verify(bigqueryRpcMock, times(4)).create(jobCapture.capture(), eq(bigQueryRpcOptions));
1682+
}
1683+
1684+
@Test
1685+
public void testCreateJobWithRetryOptionsFailureShouldNotRetry() {
1686+
// Validate create job with RetryOptions that only attempts once (no retry).
1687+
JobOption retryOptions = JobOption.retryOptions(RetryOption.maxAttempts(1));
1688+
Map<BigQueryRpc.Option, ?> bigQueryRpcOptions = optionMap(retryOptions);
1689+
when(bigqueryRpcMock.create(jobCapture.capture(), eq(bigQueryRpcOptions)))
1690+
.thenThrow(new BigQueryException(500, "InternalError"))
1691+
.thenReturn(newJobPb());
1692+
1693+
bigquery = options.getService();
1694+
bigquery =
1695+
options
1696+
.toBuilder()
1697+
.setRetrySettings(ServiceOptions.getDefaultRetrySettings())
1698+
.build()
1699+
.getService();
1700+
1701+
try {
1702+
((BigQueryImpl) bigquery)
1703+
.create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), retryOptions);
1704+
fail("JobException expected");
1705+
} catch (BigQueryException e) {
1706+
assertNotNull(e.getMessage());
1707+
}
1708+
verify(bigqueryRpcMock, times(1)).create(jobCapture.capture(), eq(bigQueryRpcOptions));
1709+
}
1710+
15971711
@Test
15981712
public void testCreateJobWithSelectedFields() {
15991713
when(bigqueryRpcMock.create(

0 commit comments

Comments
 (0)