20
20
import com .google .api .gax .rpc .ServerStreamingCallable ;
21
21
import com .google .api .gax .rpc .UnaryCallable ;
22
22
import com .google .cloud .bigquery .storage .v1 .stub .EnhancedBigQueryReadStub ;
23
+ import io .opentelemetry .api .common .Attributes ;
24
+ import io .opentelemetry .api .trace .Span ;
25
+ import io .opentelemetry .context .Scope ;
23
26
import java .io .IOException ;
24
27
import java .util .concurrent .TimeUnit ;
25
28
@@ -128,7 +131,9 @@ protected BigQueryReadClient(BigQueryReadSettings settings) throws IOException {
128
131
this .settings = settings ;
129
132
this .stub =
130
133
EnhancedBigQueryReadStub .create (
131
- settings .getTypedStubSettings (), settings .getReadRowsRetryAttemptListener ());
134
+ settings .getTypedStubSettings (),
135
+ settings .getReadRowsRetryAttemptListener (),
136
+ settings .isOpenTelemetryEnabled ());
132
137
}
133
138
134
139
@ BetaApi ("A restructuring of stub classes is planned, so this may break in the future" )
@@ -229,7 +234,32 @@ public final ReadSession createReadSession(
229
234
* @throws com.google.api.gax.rpc.ApiException if the remote call fails
230
235
*/
231
236
public final ReadSession createReadSession (CreateReadSessionRequest request ) {
232
- return createReadSessionCallable ().call (request );
237
+ Span createReadSession = null ;
238
+ if (settings .isOpenTelemetryEnabled ()) {
239
+ createReadSession =
240
+ settings
241
+ .getOpenTelemetryTracer ()
242
+ .spanBuilder ("com.google.cloud.bigquery.storage.v1.read.createReadSession" )
243
+ .setAttribute (
244
+ "bq.storage.read_session.request.parent" , getFieldAsString (request .getParent ()))
245
+ .setAttribute (
246
+ "bq.storage.read_session.request.max_stream_count" , request .getMaxStreamCount ())
247
+ .setAttribute (
248
+ "bq.storage.read_session.request.preferred_min_stream_count" ,
249
+ request .getPreferredMinStreamCount ())
250
+ .setAttribute (
251
+ "bq.storage.read_session.request.serialized_size" , request .getSerializedSize ())
252
+ .setAllAttributes (otelAttributesFrom (request .getReadSession ()))
253
+ .startSpan ();
254
+ }
255
+ try (Scope createReadSessionScope =
256
+ createReadSession != null ? createReadSession .makeCurrent () : null ) {
257
+ return createReadSessionCallable ().call (request );
258
+ } finally {
259
+ if (createReadSession != null ) {
260
+ createReadSession .end ();
261
+ }
262
+ }
233
263
}
234
264
235
265
/**
@@ -262,7 +292,22 @@ public final ReadSession createReadSession(CreateReadSessionRequest request) {
262
292
* </code></pre>
263
293
*/
264
294
public final UnaryCallable <CreateReadSessionRequest , ReadSession > createReadSessionCallable () {
265
- return stub .createReadSessionCallable ();
295
+ Span createReadSessionCallable = null ;
296
+ if (settings .isOpenTelemetryEnabled ()) {
297
+ createReadSessionCallable =
298
+ settings
299
+ .getOpenTelemetryTracer ()
300
+ .spanBuilder ("com.google.cloud.bigquery.storage.v1.read.createReadSessionCallable" )
301
+ .startSpan ();
302
+ }
303
+ try (Scope createReadSessionCallableScope =
304
+ createReadSessionCallable != null ? createReadSessionCallable .makeCurrent () : null ) {
305
+ return stub .createReadSessionCallable ();
306
+ } finally {
307
+ if (createReadSessionCallable != null ) {
308
+ createReadSessionCallable .end ();
309
+ }
310
+ }
266
311
}
267
312
268
313
/**
@@ -287,7 +332,22 @@ public final UnaryCallable<CreateReadSessionRequest, ReadSession> createReadSess
287
332
* </code></pre>
288
333
*/
289
334
public final ServerStreamingCallable <ReadRowsRequest , ReadRowsResponse > readRowsCallable () {
290
- return stub .readRowsCallable ();
335
+ Span readRowsCallable = null ;
336
+ if (settings .isOpenTelemetryEnabled ()) {
337
+ readRowsCallable =
338
+ settings
339
+ .getOpenTelemetryTracer ()
340
+ .spanBuilder ("com.google.cloud.bigquery.storage.v1.read.readRowsCallable" )
341
+ .startSpan ();
342
+ }
343
+ try (Scope readRowsCallableScope =
344
+ readRowsCallable != null ? readRowsCallable .makeCurrent () : null ) {
345
+ return stub .readRowsCallable ();
346
+ } finally {
347
+ if (readRowsCallable != null ) {
348
+ readRowsCallable .end ();
349
+ }
350
+ }
291
351
}
292
352
293
353
/**
@@ -315,7 +375,23 @@ public final ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> readRows
315
375
* @throws com.google.api.gax.rpc.ApiException if the remote call fails
316
376
*/
317
377
public final SplitReadStreamResponse splitReadStream (SplitReadStreamRequest request ) {
318
- return splitReadStreamCallable ().call (request );
378
+ Span splitReadStream = null ;
379
+ if (settings .isOpenTelemetryEnabled ()) {
380
+ splitReadStream =
381
+ settings
382
+ .getOpenTelemetryTracer ()
383
+ .spanBuilder ("com.google.cloud.bigquery.storage.v1.read.splitReadStream" )
384
+ .setAllAttributes (otelAttributesFrom (request ))
385
+ .startSpan ();
386
+ }
387
+ try (Scope splitReadStreamScope =
388
+ splitReadStream != null ? splitReadStream .makeCurrent () : null ) {
389
+ return splitReadStreamCallable ().call (request );
390
+ } finally {
391
+ if (splitReadStream != null ) {
392
+ splitReadStream .end ();
393
+ }
394
+ }
319
395
}
320
396
321
397
/**
@@ -343,17 +419,60 @@ public final SplitReadStreamResponse splitReadStream(SplitReadStreamRequest requ
343
419
*/
344
420
public final UnaryCallable <SplitReadStreamRequest , SplitReadStreamResponse >
345
421
splitReadStreamCallable () {
346
- return stub .splitReadStreamCallable ();
422
+ Span splitReadStreamCallable = null ;
423
+ if (settings .isOpenTelemetryEnabled ()) {
424
+ splitReadStreamCallable =
425
+ settings
426
+ .getOpenTelemetryTracer ()
427
+ .spanBuilder ("com.google.cloud.bigquery.storage.v1.read.splitReadStreamCallable" )
428
+ .startSpan ();
429
+ }
430
+ try (Scope readRowsCallableScope =
431
+ splitReadStreamCallable != null ? splitReadStreamCallable .makeCurrent () : null ) {
432
+ return stub .splitReadStreamCallable ();
433
+ } finally {
434
+ if (splitReadStreamCallable != null ) {
435
+ splitReadStreamCallable .end ();
436
+ }
437
+ }
347
438
}
348
439
349
440
@ Override
350
441
public final void close () {
351
- stub .close ();
442
+ Span close = null ;
443
+ if (settings .isOpenTelemetryEnabled ()) {
444
+ close =
445
+ settings
446
+ .getOpenTelemetryTracer ()
447
+ .spanBuilder ("com.google.cloud.bigquery.storage.v1.read.close" )
448
+ .startSpan ();
449
+ }
450
+ try (Scope closeScope = close != null ? close .makeCurrent () : null ) {
451
+ stub .close ();
452
+ } finally {
453
+ if (close != null ) {
454
+ close .end ();
455
+ }
456
+ }
352
457
}
353
458
354
459
@ Override
355
460
public void shutdown () {
356
- stub .shutdown ();
461
+ Span shutdown = null ;
462
+ if (settings .isOpenTelemetryEnabled ()) {
463
+ shutdown =
464
+ settings
465
+ .getOpenTelemetryTracer ()
466
+ .spanBuilder ("com.google.cloud.bigquery.storage.v1.read.shutdown" )
467
+ .startSpan ();
468
+ }
469
+ try (Scope shutdownScope = shutdown != null ? shutdown .makeCurrent () : null ) {
470
+ stub .shutdown ();
471
+ } finally {
472
+ if (shutdown != null ) {
473
+ shutdown .end ();
474
+ }
475
+ }
357
476
}
358
477
359
478
@ Override
@@ -368,11 +487,89 @@ public boolean isTerminated() {
368
487
369
488
@ Override
370
489
public void shutdownNow () {
371
- stub .shutdownNow ();
490
+ Span shutdownNow = null ;
491
+ if (settings .isOpenTelemetryEnabled ()) {
492
+ shutdownNow =
493
+ settings
494
+ .getOpenTelemetryTracer ()
495
+ .spanBuilder ("com.google.cloud.bigquery.storage.v1.read.shutdownNow" )
496
+ .startSpan ();
497
+ }
498
+ try (Scope shutdownNowScope = shutdownNow != null ? shutdownNow .makeCurrent () : null ) {
499
+ stub .shutdownNow ();
500
+ } finally {
501
+ if (shutdownNow != null ) {
502
+ shutdownNow .end ();
503
+ }
504
+ }
372
505
}
373
506
374
507
@ Override
375
508
public boolean awaitTermination (long duration , TimeUnit unit ) throws InterruptedException {
376
- return stub .awaitTermination (duration , unit );
509
+ Span awaitTermination = null ;
510
+ if (settings .isOpenTelemetryEnabled ()) {
511
+ awaitTermination =
512
+ settings
513
+ .getOpenTelemetryTracer ()
514
+ .spanBuilder ("com.google.cloud.bigquery.storage.v1.read.awaitTermination" )
515
+ .setAttribute ("duration" , duration )
516
+ .setAttribute ("unit" , unit .toString ())
517
+ .startSpan ();
518
+ }
519
+ try (Scope awaitTerminationScope =
520
+ awaitTermination != null ? awaitTermination .makeCurrent () : null ) {
521
+ return stub .awaitTermination (duration , unit );
522
+ } finally {
523
+ if (awaitTermination != null ) {
524
+ awaitTermination .end ();
525
+ }
526
+ }
527
+ }
528
+
529
+ public void disableOpenTelemetryTracing () {
530
+ settings .setEnableOpenTelemetryTracing (false );
531
+ }
532
+
533
+ public void enableOpenTelemetryTracing () {
534
+ settings .setEnableOpenTelemetryTracing (true );
535
+ }
536
+
537
+ private static String getFieldAsString (Object field ) {
538
+ return field == null ? "null" : field .toString ();
539
+ }
540
+
541
+ private Attributes otelAttributesFrom (ReadSession readSession ) {
542
+ return Attributes .builder ()
543
+ .put ("bq.storage.read_session.name" , getFieldAsString (readSession .getName ()))
544
+ .put (
545
+ "bq.storage.read_session.data_format_value" ,
546
+ getFieldAsString (readSession .getDataFormatValue ()))
547
+ .put (
548
+ "bq.storage.read_session.serialized_size" ,
549
+ getFieldAsString (readSession .getSerializedSize ()))
550
+ .put ("bq.storage.read_session.table" , getFieldAsString (readSession .getTable ()))
551
+ .put ("bq.storage.read_session.estimated_row_count" , readSession .getEstimatedRowCount ())
552
+ .put (
553
+ "bq.storage.read_session.estimated_total_bytes_scanned" ,
554
+ readSession .getEstimatedTotalBytesScanned ())
555
+ .put (
556
+ "bq.storage.read_session.estimated_total_physical_bytes" ,
557
+ readSession .getEstimatedTotalPhysicalFileSize ())
558
+ .put ("bq.storage.read_session.streams_count" , readSession .getStreamsCount ())
559
+ .put ("bq.storage.read_session.trace_id" , getFieldAsString (readSession .getTraceId ()))
560
+ .put ("bq.storage.read_session.expire_time" , getFieldAsString (readSession .getExpireTime ()))
561
+ .build ();
562
+ }
563
+
564
+ private Attributes otelAttributesFrom (SplitReadStreamRequest request ) {
565
+ return Attributes .builder ()
566
+ .put ("bq.storage.split_read_stream_request.name" , getFieldAsString (request .getName ()))
567
+ .put (
568
+ "bq.storage.split_read_stream_request.serialized_size" ,
569
+ getFieldAsString (request .getSerializedSize ()))
570
+ .put (
571
+ "bq.storage.split_read_stream_request.fraction" ,
572
+ getFieldAsString (request .getFraction ()))
573
+ .build ();
377
574
}
378
575
}
0 commit comments