19
19
import static java .util .Collections .singletonMap ;
20
20
import static java .util .concurrent .TimeUnit .SECONDS ;
21
21
22
+ import build .bazel .remote .execution .v2 .Digest ;
22
23
import com .google .bytestream .ByteStreamGrpc ;
23
24
import com .google .bytestream .ByteStreamGrpc .ByteStreamFutureStub ;
24
25
import com .google .bytestream .ByteStreamProto .QueryWriteStatusRequest ;
64
65
/**
65
66
* A client implementing the {@code Write} method of the {@code ByteStream} gRPC service.
66
67
*
67
- * <p>The uploader supports reference counting to easily be shared between components with
68
- * different lifecyles. After instantiation the reference count is {@code 1}.
68
+ * <p>The uploader supports reference counting to easily be shared between components with different
69
+ * lifecyles. After instantiation the reference count is {@code 1}.
69
70
*
70
- * See {@link ReferenceCounted} for more information on reference counting.
71
+ * <p> See {@link ReferenceCounted} for more information on reference counting.
71
72
*/
72
73
class ByteStreamUploader extends AbstractReferenceCounted {
73
74
@@ -81,12 +82,12 @@ class ByteStreamUploader extends AbstractReferenceCounted {
81
82
82
83
private final Object lock = new Object ();
83
84
84
- /** Contains the hash codes of already uploaded blobs. **/
85
+ /** Contains the hash codes of already uploaded blobs. * */
85
86
@ GuardedBy ("lock" )
86
87
private final Set <HashCode > uploadedBlobs = new HashSet <>();
87
88
88
89
@ GuardedBy ("lock" )
89
- private final Map <HashCode , ListenableFuture <Void >> uploadsInProgress = new HashMap <>();
90
+ private final Map <Digest , ListenableFuture <Void >> uploadsInProgress = new HashMap <>();
90
91
91
92
@ GuardedBy ("lock" )
92
93
private boolean isShutdown ;
@@ -179,8 +180,8 @@ public void uploadBlobs(Map<HashCode, Chunker> chunkers, boolean forceUpload)
179
180
* Cancels all running uploads. The method returns immediately and does NOT wait for the uploads
180
181
* to be cancelled.
181
182
*
182
- * <p>This method should not be called directly, but will be called implicitly when the
183
- * reference count reaches {@code 0}.
183
+ * <p>This method should not be called directly, but will be called implicitly when the reference
184
+ * count reaches {@code 0}.
184
185
*/
185
186
@ VisibleForTesting
186
187
void shutdown () {
@@ -199,6 +200,16 @@ void shutdown() {
199
200
}
200
201
}
201
202
203
+ /** @deprecated Use {@link #uploadBlobAsync(Digest, Chunker, boolean)} instead. */
204
+ @ Deprecated
205
+ @ VisibleForTesting
206
+ public ListenableFuture <Void > uploadBlobAsync (
207
+ HashCode hash , Chunker chunker , boolean forceUpload ) {
208
+ Digest digest =
209
+ Digest .newBuilder ().setHash (hash .toString ()).setSizeBytes (chunker .getSize ()).build ();
210
+ return uploadBlobAsync (digest , chunker , forceUpload );
211
+ }
212
+
202
213
/**
203
214
* Uploads a BLOB asynchronously to the remote {@code ByteStream} service. The call returns
204
215
* immediately and one can listen to the returned future for the success/failure of the upload.
@@ -209,32 +220,32 @@ void shutdown() {
209
220
* <p>Trying to upload the same BLOB multiple times concurrently, results in only one upload being
210
221
* performed. This is transparent to the user of this API.
211
222
*
212
- * @param hash the hash of the data to upload.
223
+ * @param digest the {@link Digest} of the data to upload.
213
224
* @param chunker the data to upload.
214
225
* @param forceUpload if {@code false} the blob is not uploaded if it has previously been
215
226
* uploaded, if {@code true} the blob is uploaded.
216
227
* @throws IOException when reading of the {@link Chunker}s input source fails
217
228
*/
218
229
public ListenableFuture <Void > uploadBlobAsync (
219
- HashCode hash , Chunker chunker , boolean forceUpload ) {
230
+ Digest digest , Chunker chunker , boolean forceUpload ) {
220
231
synchronized (lock ) {
221
232
checkState (!isShutdown , "Must not call uploadBlobs after shutdown." );
222
233
223
- if (!forceUpload && uploadedBlobs .contains (hash )) {
234
+ if (!forceUpload && uploadedBlobs .contains (HashCode . fromString ( digest . getHash ()) )) {
224
235
return Futures .immediateFuture (null );
225
236
}
226
237
227
- ListenableFuture <Void > inProgress = uploadsInProgress .get (hash );
238
+ ListenableFuture <Void > inProgress = uploadsInProgress .get (digest );
228
239
if (inProgress != null ) {
229
240
return inProgress ;
230
241
}
231
242
232
243
ListenableFuture <Void > uploadResult =
233
244
Futures .transform (
234
- startAsyncUpload (hash , chunker ),
245
+ startAsyncUpload (digest , chunker ),
235
246
(v ) -> {
236
247
synchronized (lock ) {
237
- uploadedBlobs .add (hash );
248
+ uploadedBlobs .add (HashCode . fromString ( digest . getHash ()) );
238
249
}
239
250
return null ;
240
251
},
@@ -244,14 +255,20 @@ public ListenableFuture<Void> uploadBlobAsync(
244
255
Futures .catchingAsync (
245
256
uploadResult ,
246
257
StatusRuntimeException .class ,
247
- (sre ) -> Futures .immediateFailedFuture (new IOException (sre )),
258
+ (sre ) ->
259
+ Futures .immediateFailedFuture (
260
+ new IOException (
261
+ String .format (
262
+ "Error while uploading artifact with digest '%s/%s'" ,
263
+ digest .getHash (), digest .getSizeBytes ()),
264
+ sre )),
248
265
MoreExecutors .directExecutor ());
249
266
250
- uploadsInProgress .put (hash , uploadResult );
267
+ uploadsInProgress .put (digest , uploadResult );
251
268
uploadResult .addListener (
252
269
() -> {
253
270
synchronized (lock ) {
254
- uploadsInProgress .remove (hash );
271
+ uploadsInProgress .remove (digest );
255
272
}
256
273
},
257
274
MoreExecutors .directExecutor ());
@@ -267,25 +284,33 @@ boolean uploadsInProgress() {
267
284
}
268
285
}
269
286
270
- private static String buildUploadResourceName (
271
- String instanceName , UUID uuid , HashCode hash , long size ) {
272
- String resourceName = format ("uploads/%s/blobs/%s/%d" , uuid , hash , size );
287
+ private static String buildUploadResourceName (String instanceName , UUID uuid , Digest digest ) {
288
+ String resourceName =
289
+ format ("uploads/%s/blobs/%s/%d" , uuid , digest . getHash (), digest . getSizeBytes () );
273
290
if (!Strings .isNullOrEmpty (instanceName )) {
274
291
resourceName = instanceName + "/" + resourceName ;
275
292
}
276
293
return resourceName ;
277
294
}
278
295
279
296
/** Starts a file upload and returns a future representing the upload. */
280
- private ListenableFuture <Void > startAsyncUpload (HashCode hash , Chunker chunker ) {
297
+ private ListenableFuture <Void > startAsyncUpload (Digest digest , Chunker chunker ) {
281
298
try {
282
299
chunker .reset ();
283
300
} catch (IOException e ) {
284
301
return Futures .immediateFailedFuture (e );
285
302
}
286
303
304
+ if (chunker .getSize () != digest .getSizeBytes ()) {
305
+ return Futures .immediateFailedFuture (
306
+ new IllegalStateException (
307
+ String .format (
308
+ "Expected chunker size of %d, got %d" ,
309
+ digest .getSizeBytes (), chunker .getSize ())));
310
+ }
311
+
287
312
UUID uploadId = UUID .randomUUID ();
288
- String resourceName = buildUploadResourceName (instanceName , uploadId , hash , chunker . getSize () );
313
+ String resourceName = buildUploadResourceName (instanceName , uploadId , digest );
289
314
AsyncUpload newUpload =
290
315
new AsyncUpload (
291
316
channel , callCredentialsProvider , callTimeoutSecs , retrier , resourceName , chunker );
0 commit comments