13
13
# limitations under the License.
14
14
15
15
import io
16
+ import warnings
16
17
17
18
from google .api_core .exceptions import RequestRangeNotSatisfiable
19
+ from google .cloud .storage ._helpers import _NUM_RETRIES_MESSAGE
20
+ from google .cloud .storage .retry import DEFAULT_RETRY
21
+ from google .cloud .storage .retry import DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED
22
+ from google .cloud .storage .retry import ConditionalRetryPolicy
23
+
18
24
19
25
# Resumable uploads require a chunk size of precisely a multiple of 256 KiB.
20
26
CHUNK_SIZE_MULTIPLE = 256 * 1024 # 256 KiB
28
34
"if_metageneration_match" ,
29
35
"if_metageneration_not_match" ,
30
36
"timeout" ,
37
+ "retry" ,
31
38
}
32
39
33
40
# Valid keyword arguments for upload methods.
34
41
# Note: Changes here need to be reflected in the blob.open() docstring.
35
42
VALID_UPLOAD_KWARGS = {
36
43
"content_type" ,
37
- "num_retries" ,
38
44
"predefined_acl" ,
45
+ "num_retries" ,
39
46
"if_generation_match" ,
40
47
"if_generation_not_match" ,
41
48
"if_metageneration_match" ,
42
49
"if_metageneration_not_match" ,
43
50
"timeout" ,
44
51
"checksum" ,
52
+ "retry" ,
45
53
}
46
54
47
55
@@ -58,13 +66,35 @@ class BlobReader(io.BufferedIOBase):
58
66
bytes than the chunk_size are requested, the remainder is buffered.
59
67
The default is the chunk_size of the blob, or 40MiB.
60
68
69
+ :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy
70
+ :param retry: (Optional) How to retry the RPC. A None value will disable
71
+ retries. A google.api_core.retry.Retry value will enable retries,
72
+ and the object will define retriable response codes and errors and
73
+ configure backoff and timeout options.
74
+
75
+ A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a
76
+ Retry object and activates it only if certain conditions are met.
77
+ This class exists to provide safe defaults for RPC calls that are
78
+ not technically safe to retry normally (due to potential data
79
+ duplication or other side-effects) but become safe to retry if a
80
+ condition such as if_metageneration_match is set.
81
+
82
+ See the retry.py source code and docstrings in this package
83
+ (google.cloud.storage.retry) for information on retry types and how
84
+ to configure them.
85
+
86
+ Media operations (downloads and uploads) do not support non-default
87
+ predicates in a Retry object. The default will always be used. Other
88
+ configuration changes for Retry objects such as delays and deadlines
89
+ are respected.
90
+
61
91
:param download_kwargs: Keyword arguments to pass to the underlying API
62
92
calls. The following arguments are supported: "if_generation_match",
63
93
"if_generation_not_match", "if_metageneration_match",
64
94
"if_metageneration_not_match", "timeout".
65
95
"""
66
96
67
- def __init__ (self , blob , chunk_size = None , ** download_kwargs ):
97
+ def __init__ (self , blob , chunk_size = None , retry = DEFAULT_RETRY , ** download_kwargs ):
68
98
"""docstring note that download_kwargs also used for reload()"""
69
99
for kwarg in download_kwargs :
70
100
if kwarg not in VALID_DOWNLOAD_KWARGS :
@@ -76,6 +106,7 @@ def __init__(self, blob, chunk_size=None, **download_kwargs):
76
106
self ._pos = 0
77
107
self ._buffer = io .BytesIO ()
78
108
self ._chunk_size = chunk_size or blob .chunk_size or DEFAULT_CHUNK_SIZE
109
+ self ._retry = retry
79
110
self ._download_kwargs = download_kwargs
80
111
81
112
def read (self , size = - 1 ):
@@ -102,6 +133,7 @@ def read(self, size=-1):
102
133
start = fetch_start ,
103
134
end = fetch_end ,
104
135
checksum = None ,
136
+ retry = self ._retry ,
105
137
** self ._download_kwargs
106
138
)
107
139
except RequestRangeNotSatisfiable :
@@ -197,14 +229,43 @@ class BlobWriter(io.BufferedIOBase):
197
229
changes the behavior of flush() to conform to TextIOWrapper's
198
230
expectations.
199
231
232
+ :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy
233
+ :param retry: (Optional) How to retry the RPC. A None value will disable
234
+ retries. A google.api_core.retry.Retry value will enable retries,
235
+ and the object will define retriable response codes and errors and
236
+ configure backoff and timeout options.
237
+
238
+ A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a
239
+ Retry object and activates it only if certain conditions are met.
240
+ This class exists to provide safe defaults for RPC calls that are
241
+ not technically safe to retry normally (due to potential data
242
+ duplication or other side-effects) but become safe to retry if a
243
+ condition such as if_metageneration_match is set.
244
+
245
+ See the retry.py source code and docstrings in this package
246
+ (google.cloud.storage.retry) for information on retry types and how
247
+ to configure them.
248
+
249
+ Media operations (downloads and uploads) do not support non-default
250
+ predicates in a Retry object. The default will always be used. Other
251
+ configuration changes for Retry objects such as delays and deadlines
252
+ are respected.
253
+
200
254
:param upload_kwargs: Keyword arguments to pass to the underlying API
201
255
calls. The following arguments are supported: "if_generation_match",
202
256
"if_generation_not_match", "if_metageneration_match",
203
257
"if_metageneration_not_match", "timeout", "content_type",
204
258
"num_retries", "predefined_acl", "checksum".
205
259
"""
206
260
207
- def __init__ (self , blob , chunk_size = None , text_mode = False , ** upload_kwargs ):
261
+ def __init__ (
262
+ self ,
263
+ blob ,
264
+ chunk_size = None ,
265
+ text_mode = False ,
266
+ retry = DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED ,
267
+ ** upload_kwargs
268
+ ):
208
269
for kwarg in upload_kwargs :
209
270
if kwarg not in VALID_UPLOAD_KWARGS :
210
271
raise ValueError (
@@ -219,6 +280,7 @@ def __init__(self, blob, chunk_size=None, text_mode=False, **upload_kwargs):
219
280
# In text mode this class will be wrapped and TextIOWrapper requires a
220
281
# different behavior of flush().
221
282
self ._text_mode = text_mode
283
+ self ._retry = retry
222
284
self ._upload_kwargs = upload_kwargs
223
285
224
286
@property
@@ -259,20 +321,32 @@ def write(self, b):
259
321
return pos
260
322
261
323
def _initiate_upload (self ):
324
+ # num_retries is only supported for backwards-compatibility reasons.
262
325
num_retries = self ._upload_kwargs .pop ("num_retries" , None )
326
+ retry = self ._retry
263
327
content_type = self ._upload_kwargs .pop ("content_type" , None )
264
328
265
- if (
266
- self ._upload_kwargs .get ("if_metageneration_match" ) is None
267
- and num_retries is None
268
- ):
269
- # Uploads are only idempotent (safe to retry) if
270
- # if_metageneration_match is set. If it is not set, the default
271
- # num_retries should be 0. Note: Because retry logic for uploads is
272
- # provided by the google-resumable-media-python package, it doesn't
273
- # use the ConditionalRetryStrategy class used in other API calls in
274
- # this library to solve this problem.
275
- num_retries = 0
329
+ if num_retries is not None :
330
+ warnings .warn (_NUM_RETRIES_MESSAGE , DeprecationWarning , stacklevel = 2 )
331
+ # num_retries and retry are mutually exclusive. If num_retries is
332
+ # set and retry is exactly the default, then nullify retry for
333
+ # backwards compatibility.
334
+ if retry is DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED :
335
+ retry = None
336
+
337
+ # Handle ConditionalRetryPolicy.
338
+ if isinstance (retry , ConditionalRetryPolicy ):
339
+ # Conditional retries are designed for non-media calls, which change
340
+ # arguments into query_params dictionaries. Media operations work
341
+ # differently, so here we make a "fake" query_params to feed to the
342
+ # ConditionalRetryPolicy.
343
+ query_params = {
344
+ "ifGenerationMatch" : self ._upload_kwargs .get ("if_generation_match" ),
345
+ "ifMetagenerationMatch" : self ._upload_kwargs .get (
346
+ "if_metageneration_match"
347
+ ),
348
+ }
349
+ retry = retry .get_retry_policy_if_conditions_met (query_params = query_params )
276
350
277
351
self ._upload_and_transport = self ._blob ._initiate_resumable_upload (
278
352
self ._blob .bucket .client ,
@@ -281,6 +355,7 @@ def _initiate_upload(self):
281
355
None ,
282
356
num_retries ,
283
357
chunk_size = self ._chunk_size ,
358
+ retry = retry ,
284
359
** self ._upload_kwargs
285
360
)
286
361
0 commit comments