@@ -1063,7 +1063,7 @@ ResultSet executeQueryInternalWithOptions(
1063
1063
final int prefetchChunks =
1064
1064
readOptions .hasPrefetchChunks () ? readOptions .prefetchChunks () : defaultPrefetchChunks ;
1065
1065
ResumableStreamIterator stream =
1066
- new ResumableStreamIterator (MAX_BUFFERED_CHUNKS , QUERY ) {
1066
+ new ResumableStreamIterator (MAX_BUFFERED_CHUNKS , QUERY , span ) {
1067
1067
@ Override
1068
1068
CloseableIterator <PartialResultSet > startStream (@ Nullable ByteString resumeToken ) {
1069
1069
GrpcStreamIterator stream = new GrpcStreamIterator (prefetchChunks );
@@ -1176,7 +1176,7 @@ ResultSet readInternalWithOptions(
1176
1176
final int prefetchChunks =
1177
1177
readOptions .hasPrefetchChunks () ? readOptions .prefetchChunks () : defaultPrefetchChunks ;
1178
1178
ResumableStreamIterator stream =
1179
- new ResumableStreamIterator (MAX_BUFFERED_CHUNKS , READ ) {
1179
+ new ResumableStreamIterator (MAX_BUFFERED_CHUNKS , READ , span ) {
1180
1180
@ Override
1181
1181
CloseableIterator <PartialResultSet > startStream (@ Nullable ByteString resumeToken ) {
1182
1182
GrpcStreamIterator stream = new GrpcStreamIterator (prefetchChunks );
@@ -1426,7 +1426,7 @@ void commit() {
1426
1426
mutations = null ;
1427
1427
}
1428
1428
final CommitRequest commitRequest = builder .build ();
1429
- Span opSpan = tracer .spanBuilder (COMMIT ).startSpan ();
1429
+ Span opSpan = tracer .spanBuilderWithExplicitParent (COMMIT , span ).startSpan ();
1430
1430
try (Scope s = tracer .withSpan (opSpan )) {
1431
1431
CommitResponse commitResponse =
1432
1432
runWithRetries (
@@ -2452,20 +2452,20 @@ abstract static class ResumableStreamIterator extends AbstractIterator<PartialRe
2452
2452
*/
2453
2453
private boolean safeToRetry = true ;
2454
2454
2455
- protected ResumableStreamIterator (int maxBufferSize , String streamName ) {
2455
+ protected ResumableStreamIterator (int maxBufferSize , String streamName , Span parent ) {
2456
2456
checkArgument (maxBufferSize >= 0 );
2457
2457
this .maxBufferSize = maxBufferSize ;
2458
- this .span = tracer .spanBuilder (streamName ).startSpan ();
2458
+ this .span = tracer .spanBuilderWithExplicitParent (streamName , parent ).startSpan ();
2459
2459
}
2460
2460
2461
2461
abstract CloseableIterator <PartialResultSet > startStream (@ Nullable ByteString resumeToken );
2462
2462
2463
2463
@ Override
2464
2464
public void close (@ Nullable String message ) {
2465
- span .end ();
2466
2465
if (stream != null ) {
2467
2466
stream .close (message );
2468
2467
}
2468
+ span .end ();
2469
2469
}
2470
2470
2471
2471
@ Override
@@ -2478,7 +2478,11 @@ protected PartialResultSet computeNext() {
2478
2478
ImmutableMap .of ("ResumeToken" ,
2479
2479
AttributeValue .stringAttributeValue (
2480
2480
resumeToken == null ? "null" : resumeToken .toStringUtf8 ())));
2481
- stream = checkNotNull (startStream (resumeToken ));
2481
+ try (Scope s = tracer .withSpan (span )) {
2482
+ // When start a new stream set the Span as current to make the gRPC Span a child of
2483
+ // this Span.
2484
+ stream = checkNotNull (startStream (resumeToken ));
2485
+ }
2482
2486
}
2483
2487
// Buffer contains items up to a resume token or has reached capacity: flush.
2484
2488
if (!buffer .isEmpty ()
0 commit comments