Skip to content

Commit 107c976

Browse files
authored
Implement connection validation (#173)
### Behavior change: * Null out session when connection is closed. * Local validation relies on having a valid session (it checks for session name, but that's only going to be non-null if and only if the connection is non-null.) * Remote validation is implemented as a low-level `GrpcClient` healthcheck. It sends a trivial query to Spanner over `executeSql` (as opposed to `executeStreamingSql`). ### TCK test change: * Removed session pools from TCK tests * Suppressed error logging when already-existing tables try to get created. This operation also takes forever; need to check whether pre-querying tables will be faster. Fixes #162 .
1 parent 99df5ce commit 107c976

File tree

6 files changed

+121
-28
lines changed

6 files changed

+121
-28
lines changed

src/main/java/com/google/cloud/spanner/r2dbc/SpannerConnection.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public class SpannerConnection implements Connection, StatementExecutionContext
4848

4949
private final Client client;
5050

51-
private final Session session;
51+
private Session session;
5252

5353
private Transaction transaction;
5454

@@ -134,7 +134,10 @@ public Mono<Void> rollbackTransaction() {
134134

135135
@Override
136136
public Mono<Void> close() {
137-
return commitTransaction(false).then(this.client.deleteSession(this.getSessionName()));
137+
return commitTransaction(false)
138+
.then(this.client.deleteSession(this.getSessionName()).doOnSuccess(none -> {
139+
this.session = null;
140+
}));
138141
}
139142

140143
@Override
@@ -211,8 +214,11 @@ private void setTransaction(
211214

212215
@Override
213216
public Publisher<Boolean> validate(ValidationDepth validationDepth) {
214-
// TODO: https://github.com/GoogleCloudPlatform/cloud-spanner-r2dbc/issues/162
215-
return Mono.just(true);
217+
if (validationDepth == ValidationDepth.LOCAL) {
218+
return Mono.fromSupplier(() -> this.getSessionName() != null);
219+
}
220+
221+
return this.client.healthcheck(this);
216222
}
217223

218224

src/main/java/com/google/cloud/spanner/r2dbc/client/Client.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,4 +141,10 @@ Mono<Operation> executeDdl(
141141
* @return a {@link Mono} that indicates that a client has been closed
142142
*/
143143
Mono<Void> close();
144+
145+
/**
146+
* Validates session associated with the passed in {@link StatementExecutionContext}.
147+
* @return {@link Mono} of whether the connection is working.
148+
*/
149+
Mono<Boolean> healthcheck(StatementExecutionContext ctx);
144150
}

src/main/java/com/google/cloud/spanner/r2dbc/client/GrpcClient.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import com.google.spanner.v1.ExecuteBatchDmlRequest.Statement;
4141
import com.google.spanner.v1.ExecuteBatchDmlResponse;
4242
import com.google.spanner.v1.ExecuteSqlRequest;
43+
import com.google.spanner.v1.ExecuteSqlRequest.Builder;
4344
import com.google.spanner.v1.PartialResultSet;
4445
import com.google.spanner.v1.ResultSet;
4546
import com.google.spanner.v1.RollbackRequest;
@@ -61,6 +62,8 @@
6162
import java.time.Duration;
6263
import java.util.List;
6364
import java.util.Map;
65+
import org.slf4j.Logger;
66+
import org.slf4j.LoggerFactory;
6467
import reactor.core.publisher.Flux;
6568
import reactor.core.publisher.Mono;
6669

@@ -81,13 +84,18 @@ public class GrpcClient implements Client {
8184

8285
private static final String USER_AGENT_LIBRARY_NAME = "cloud-spanner-r2dbc";
8386

87+
private static final String HEALTHCHECK_SQL = "SELECT 1";
88+
8489
private static final int PORT = 443;
8590

91+
private static final Logger LOGGER = LoggerFactory.getLogger(GrpcClient.class);
92+
8693
private final ManagedChannel channel;
8794
private final SpannerStub spanner;
8895
private final DatabaseAdminStub databaseAdmin;
8996
private final OperationsStub operations;
9097

98+
9199
/**
92100
* Initializes the Cloud Spanner gRPC async stub.
93101
*
@@ -260,10 +268,7 @@ public Flux<PartialResultSet> executeStreamingSql(
260268
return Flux.defer(() -> {
261269
Assert.requireNonNull(ctx.getSessionName(), "Session name must not be null");
262270

263-
ExecuteSqlRequest.Builder executeSqlRequest =
264-
ExecuteSqlRequest.newBuilder()
265-
.setSql(sql)
266-
.setSession(ctx.getSessionName());
271+
ExecuteSqlRequest.Builder executeSqlRequest = buildSqlRequest(ctx, sql);
267272

268273
if (params != null) {
269274
executeSqlRequest
@@ -331,6 +336,30 @@ public Mono<Void> close() {
331336
});
332337
}
333338

339+
@Override
340+
public Mono<Boolean> healthcheck(StatementExecutionContext ctx) {
341+
return Mono.defer(() -> {
342+
if (ctx.getSessionName() == null) {
343+
return Mono.just(false);
344+
}
345+
346+
return ObservableReactiveUtil.<ResultSet>unaryCall(
347+
obs -> this.spanner.executeSql(buildSqlRequest(ctx, HEALTHCHECK_SQL).build(), obs)
348+
)
349+
.map(rs -> Boolean.TRUE)
350+
.onErrorResume(error -> {
351+
this.LOGGER.warn("Cloud Spanner healthcheck failed", error);
352+
return Mono.just(Boolean.FALSE);
353+
});
354+
});
355+
}
356+
357+
private Builder buildSqlRequest(StatementExecutionContext ctx, String sql) {
358+
return ExecuteSqlRequest.newBuilder()
359+
.setSql(sql)
360+
.setSession(ctx.getSessionName());
361+
}
362+
334363
@VisibleForTesting
335364
public SpannerStub getSpanner() {
336365
return this.spanner;

src/test/java/com/google/cloud/spanner/r2dbc/SpannerConnectionTest.java

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import com.google.spanner.v1.Type;
4646
import com.google.spanner.v1.TypeCode;
4747
import io.r2dbc.spi.Statement;
48+
import io.r2dbc.spi.ValidationDepth;
4849
import java.util.Collections;
4950
import java.util.Map;
5051
import org.junit.Before;
@@ -312,7 +313,6 @@ public void turningAutocommitOffIsNoopWhenAlreadyOff() {
312313
verifyZeroInteractions(this.mockClient);
313314
}
314315

315-
316316
@Test
317317
public void turningAutocommitOffWorksLocally() {
318318
SpannerConnection connection = new SpannerConnection(this.mockClient, TEST_SESSION, null);
@@ -350,20 +350,56 @@ public void turningAutocommitOnCommitsExistingTransaction() {
350350
@Test
351351
public void turningAutocommitOnDoesNotAffectNonReadwriteTransaction() {
352352
SpannerConnection connection = new SpannerConnection(this.mockClient, TEST_SESSION, null);
353-
TransactionOptions readonlyTransaction = TransactionOptions.newBuilder()
354-
.setReadOnly(ReadOnly.getDefaultInstance()).build();
353+
TransactionOptions readonlyTransaction =
354+
TransactionOptions.newBuilder().setReadOnly(ReadOnly.getDefaultInstance()).build();
355355

356356
StepVerifier.create(
357-
Mono.from(connection.setAutoCommit(false))
358-
.then(connection.beginTransaction(readonlyTransaction))
359-
.then(Mono.from(connection.setAutoCommit(true)))
360-
).verifyComplete();
357+
Mono.from(connection.setAutoCommit(false))
358+
.then(connection.beginTransaction(readonlyTransaction))
359+
.then(Mono.from(connection.setAutoCommit(true))))
360+
.verifyComplete();
361361

362362
verify(this.mockClient).beginTransaction(TEST_SESSION_NAME, readonlyTransaction);
363363
verify(this.mockClient, times(0)).commitTransaction(eq(TEST_SESSION_NAME), any());
364364
assertThat(connection.isAutoCommit()).isTrue();
365365
}
366366

367+
@Test
368+
public void localValidatePassesOnNewConnection() {
369+
SpannerConnection connection = new SpannerConnection(this.mockClient, TEST_SESSION, null);
370+
StepVerifier.create(connection.validate(ValidationDepth.LOCAL))
371+
.expectNext(true)
372+
.verifyComplete();
373+
verifyZeroInteractions(this.mockClient);
374+
}
375+
376+
@Test
377+
public void localValidateFailsOnClosedConnection() {
378+
when(this.mockClient.commitTransaction(any(), any()))
379+
.thenReturn(Mono.just(CommitResponse.getDefaultInstance()));
380+
when(this.mockClient.deleteSession(any())).thenReturn(Mono.empty());
381+
382+
SpannerConnection connection = new SpannerConnection(this.mockClient, TEST_SESSION, null);
383+
384+
StepVerifier.create(
385+
connection.close()
386+
.then(Mono.from(connection.validate(ValidationDepth.LOCAL))))
387+
.expectNext(false)
388+
.verifyComplete();
389+
verify(this.mockClient, times(0)).healthcheck(any());
390+
}
391+
392+
@Test
393+
public void remoteValidateCallsServerHealthcheck() {
394+
when(this.mockClient.healthcheck(any())).thenReturn(Mono.just(true));
395+
396+
SpannerConnection connection = new SpannerConnection(this.mockClient, TEST_SESSION, null);
397+
StepVerifier.create(connection.validate(ValidationDepth.REMOTE))
398+
.expectNext(true)
399+
.verifyComplete();
400+
verify(this.mockClient).healthcheck(connection);
401+
}
402+
367403
private PartialResultSet makeBookPrs(String bookName) {
368404
return PartialResultSet.newBuilder()
369405
.setMetadata(ResultSetMetadata.newBuilder().setRowType(StructType.newBuilder()

src/test/java/com/google/cloud/spanner/r2dbc/client/GrpcClientTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,30 @@ public void testUserAgentConfig()
182182
(String) userAgentField.get(deletegateField.get(channel))));
183183
}
184184

185+
@Test
186+
public void testHealthcheck() throws IOException {
187+
String sql = "SELECT 1";
188+
SpannerImplBase spannerSpy = doTest(new SpannerImplBase() {
189+
@Override
190+
public void executeSql(ExecuteSqlRequest request,
191+
StreamObserver<ResultSet> responseObserver) {
192+
responseObserver.onNext(ResultSet.newBuilder().build());
193+
responseObserver.onCompleted();
194+
}
195+
},
196+
// call the method under test
197+
grpcClient -> grpcClient.healthcheck(this.mockContext).block());
198+
199+
// verify the service was called correctly
200+
ArgumentCaptor<ExecuteSqlRequest> requestCaptor = ArgumentCaptor
201+
.forClass(ExecuteSqlRequest.class);
202+
verify(spannerSpy).executeSql(requestCaptor.capture(), any());
203+
assertEquals(sql, requestCaptor.getValue().getSql());
204+
assertEquals(SESSION_NAME, requestCaptor.getValue().getSession());
205+
assertEquals(ByteString.EMPTY, requestCaptor.getValue().getTransaction().getId());
206+
207+
}
208+
185209
/**
186210
* Starts and shuts down an in-process gRPC service based on the {@code serviceImpl} provided,
187211
* while allowing a test to execute using the {@link GrpcClient}.

src/test/java/com/google/cloud/spanner/r2dbc/it/SpannerExample.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@
3131
import com.google.cloud.spanner.DatabaseId;
3232
import com.google.cloud.spanner.Spanner;
3333
import com.google.cloud.spanner.SpannerOptions;
34-
import io.r2dbc.pool.ConnectionPool;
35-
import io.r2dbc.pool.ConnectionPoolConfiguration;
3634
import io.r2dbc.spi.Connection;
3735
import io.r2dbc.spi.ConnectionFactories;
3836
import io.r2dbc.spi.ConnectionFactory;
@@ -41,7 +39,6 @@
4139
import io.r2dbc.spi.Statement;
4240
import io.r2dbc.spi.test.TestKit;
4341
import java.nio.charset.StandardCharsets;
44-
import java.time.Duration;
4542
import java.util.Arrays;
4643
import java.util.Collection;
4744
import java.util.Collections;
@@ -72,13 +69,6 @@ public class SpannerExample implements TestKit<String> {
7269
.option(DATABASE, TEST_DATABASE)
7370
.build());
7471

75-
private static final ConnectionPool pool =
76-
new ConnectionPool(ConnectionPoolConfiguration.builder(connectionFactory)
77-
.validationQuery("SELECT 1")
78-
.maxIdleTime(Duration.ofSeconds(10))
79-
.maxSize(15)
80-
.build());
81-
8272
private static final Logger logger = LoggerFactory.getLogger(SpannerExample.class);
8373

8474
private static final JdbcOperations jdbcOperations;
@@ -130,12 +120,14 @@ private static void runDdl(DatabaseId id, DatabaseAdminClient dbAdminClient, Str
130120
Collections.singletonList(query),
131121
null).get();
132122
} catch (Exception e) {
133-
logger.info("Couldn't run DDL", e);
123+
if (!e.getMessage().contains("Duplicate name in schema")) {
124+
logger.info("Couldn't run DDL", e);
125+
}
134126
}
135127
}
136128

137129
private static void executeDml(Function<Connection, Statement> statementFunc) {
138-
Mono.from(pool.create())
130+
Mono.from(connectionFactory.create())
139131
.delayUntil(c -> c.beginTransaction())
140132
.delayUntil(c -> Flux.from(statementFunc.apply(c).execute())
141133
.flatMapSequential(r -> Mono.from(r.getRowsUpdated())))
@@ -152,7 +144,7 @@ private static <T> Mono<T> close(Connection connection) {
152144

153145
@Override
154146
public ConnectionFactory getConnectionFactory() {
155-
return pool;
147+
return connectionFactory;
156148
}
157149

158150
// we don't need to create tables because it is slow. we do it upfront.

0 commit comments

Comments
 (0)