|
24 | 24 |
|
25 | 25 | from google.cloud.bigquery.client import Client
|
26 | 26 | from google.cloud.bigquery import _job_helpers
|
27 |
| -from google.cloud.bigquery.retry import DEFAULT_JOB_RETRY |
| 27 | +import google.cloud.bigquery.retry |
28 | 28 |
|
29 | 29 | from .helpers import make_connection
|
30 | 30 |
|
@@ -126,6 +126,168 @@ def api_request(method, path, query_params=None, data=None, **kw):
|
126 | 126 | assert job.job_id == orig_job_id
|
127 | 127 |
|
128 | 128 |
|
| 129 | +def test_query_retry_with_default_retry_and_ambiguous_errors_only_retries_with_failed_job( |
| 130 | + client, monkeypatch |
| 131 | +): |
| 132 | + """ |
| 133 | + Some errors like 'rateLimitExceeded' can be ambiguous. Make sure we only |
| 134 | + retry the job when we know for sure that the job has failed for a retriable |
| 135 | + reason. We can only be sure after a "successful" call to jobs.get to fetch |
| 136 | + the failed job status. |
| 137 | + """ |
| 138 | + job_counter = 0 |
| 139 | + |
| 140 | + def make_job_id(*args, **kwargs): |
| 141 | + nonlocal job_counter |
| 142 | + job_counter += 1 |
| 143 | + return f"{job_counter}" |
| 144 | + |
| 145 | + monkeypatch.setattr(_job_helpers, "make_job_id", make_job_id) |
| 146 | + |
| 147 | + project = client.project |
| 148 | + job_reference_1 = {"projectId": project, "jobId": "1", "location": "test-loc"} |
| 149 | + job_reference_2 = {"projectId": project, "jobId": "2", "location": "test-loc"} |
| 150 | + NUM_API_RETRIES = 2 |
| 151 | + |
| 152 | + # This error is modeled after a real customer exception in |
| 153 | + # https://github.com/googleapis/python-bigquery/issues/707. |
| 154 | + internal_error = google.api_core.exceptions.InternalServerError( |
| 155 | + "Job failed just because...", |
| 156 | + errors=[ |
| 157 | + {"reason": "internalError"}, |
| 158 | + ], |
| 159 | + ) |
| 160 | + responses = [ |
| 161 | + # jobs.insert |
| 162 | + {"jobReference": job_reference_1, "status": {"state": "PENDING"}}, |
| 163 | + # jobs.get |
| 164 | + {"jobReference": job_reference_1, "status": {"state": "RUNNING"}}, |
| 165 | + # jobs.getQueryResults x2 |
| 166 | + # |
| 167 | + # Note: internalError is ambiguous in jobs.getQueryResults. The |
| 168 | + # problem could be at the Google Frontend leve or it could be because |
| 169 | + # the job has failed due to some transient issues and the BigQuery |
| 170 | + # REST API is translating the job failed status into failure HTTP |
| 171 | + # codes. |
| 172 | + # |
| 173 | + # TODO(GH#1903): We shouldn't retry nearly this many times when we get |
| 174 | + # ambiguous errors from jobs.getQueryResults. |
| 175 | + # See: https://github.com/googleapis/python-bigquery/issues/1903 |
| 176 | + internal_error, |
| 177 | + internal_error, |
| 178 | + # jobs.get -- the job has failed |
| 179 | + { |
| 180 | + "jobReference": job_reference_1, |
| 181 | + "status": {"state": "DONE", "errorResult": {"reason": "internalError"}}, |
| 182 | + }, |
| 183 | + # jobs.insert |
| 184 | + {"jobReference": job_reference_2, "status": {"state": "PENDING"}}, |
| 185 | + # jobs.get |
| 186 | + {"jobReference": job_reference_2, "status": {"state": "RUNNING"}}, |
| 187 | + # jobs.getQueryResults |
| 188 | + {"jobReference": job_reference_2, "jobComplete": True}, |
| 189 | + # jobs.get |
| 190 | + {"jobReference": job_reference_2, "status": {"state": "DONE"}}, |
| 191 | + ] |
| 192 | + |
| 193 | + conn = client._connection = make_connection() |
| 194 | + conn.api_request.side_effect = responses |
| 195 | + |
| 196 | + with freezegun.freeze_time( |
| 197 | + # Note: because of exponential backoff and a bit of jitter, |
| 198 | + # NUM_API_RETRIES will get less accurate the greater the value. |
| 199 | + # We add 1 because we know there will be at least some additional |
| 200 | + # calls to fetch the time / sleep before the retry deadline is hit. |
| 201 | + auto_tick_seconds=( |
| 202 | + google.cloud.bigquery.retry._DEFAULT_RETRY_DEADLINE / NUM_API_RETRIES |
| 203 | + ) |
| 204 | + + 1, |
| 205 | + ): |
| 206 | + job = client.query("select 1") |
| 207 | + job.result() |
| 208 | + |
| 209 | + conn.api_request.assert_has_calls( |
| 210 | + [ |
| 211 | + # jobs.insert |
| 212 | + mock.call( |
| 213 | + method="POST", |
| 214 | + path="/projects/PROJECT/jobs", |
| 215 | + data={ |
| 216 | + "jobReference": {"jobId": "1", "projectId": "PROJECT"}, |
| 217 | + "configuration": { |
| 218 | + "query": {"useLegacySql": False, "query": "select 1"} |
| 219 | + }, |
| 220 | + }, |
| 221 | + timeout=None, |
| 222 | + ), |
| 223 | + # jobs.get |
| 224 | + mock.call( |
| 225 | + method="GET", |
| 226 | + path="/projects/PROJECT/jobs/1", |
| 227 | + query_params={"location": "test-loc"}, |
| 228 | + timeout=None, |
| 229 | + ), |
| 230 | + # jobs.getQueryResults x2 |
| 231 | + mock.call( |
| 232 | + method="GET", |
| 233 | + path="/projects/PROJECT/queries/1", |
| 234 | + query_params={"maxResults": 0, "location": "test-loc"}, |
| 235 | + timeout=None, |
| 236 | + ), |
| 237 | + mock.call( |
| 238 | + method="GET", |
| 239 | + path="/projects/PROJECT/queries/1", |
| 240 | + query_params={"maxResults": 0, "location": "test-loc"}, |
| 241 | + timeout=None, |
| 242 | + ), |
| 243 | + # jobs.get -- verify that the job has failed |
| 244 | + mock.call( |
| 245 | + method="GET", |
| 246 | + path="/projects/PROJECT/jobs/1", |
| 247 | + query_params={"location": "test-loc"}, |
| 248 | + timeout=None, |
| 249 | + ), |
| 250 | + # jobs.insert |
| 251 | + mock.call( |
| 252 | + method="POST", |
| 253 | + path="/projects/PROJECT/jobs", |
| 254 | + data={ |
| 255 | + "jobReference": { |
| 256 | + # Make sure that we generated a new job ID. |
| 257 | + "jobId": "2", |
| 258 | + "projectId": "PROJECT", |
| 259 | + }, |
| 260 | + "configuration": { |
| 261 | + "query": {"useLegacySql": False, "query": "select 1"} |
| 262 | + }, |
| 263 | + }, |
| 264 | + timeout=None, |
| 265 | + ), |
| 266 | + # jobs.get |
| 267 | + mock.call( |
| 268 | + method="GET", |
| 269 | + path="/projects/PROJECT/jobs/2", |
| 270 | + query_params={"location": "test-loc"}, |
| 271 | + timeout=None, |
| 272 | + ), |
| 273 | + # jobs.getQueryResults |
| 274 | + mock.call( |
| 275 | + method="GET", |
| 276 | + path="/projects/PROJECT/queries/2", |
| 277 | + query_params={"maxResults": 0, "location": "test-loc"}, |
| 278 | + timeout=None, |
| 279 | + ), |
| 280 | + # jobs.get |
| 281 | + mock.call( |
| 282 | + method="GET", |
| 283 | + path="/projects/PROJECT/jobs/2", |
| 284 | + query_params={"location": "test-loc"}, |
| 285 | + timeout=None, |
| 286 | + ), |
| 287 | + ] |
| 288 | + ) |
| 289 | + |
| 290 | + |
129 | 291 | # With job_retry_on_query, we're testing 4 scenarios:
|
130 | 292 | # - Pass None retry to `query`.
|
131 | 293 | # - Pass None retry to `result`.
|
@@ -301,8 +463,8 @@ def test_query_and_wait_retries_job_for_DDL_queries():
|
301 | 463 | job_config=None,
|
302 | 464 | page_size=None,
|
303 | 465 | max_results=None,
|
304 |
| - retry=DEFAULT_JOB_RETRY, |
305 |
| - job_retry=DEFAULT_JOB_RETRY, |
| 466 | + retry=google.cloud.bigquery.retry.DEFAULT_RETRY, |
| 467 | + job_retry=google.cloud.bigquery.retry.DEFAULT_JOB_RETRY, |
306 | 468 | )
|
307 | 469 | assert len(list(rows)) == 4
|
308 | 470 |
|
|
0 commit comments