diff --git a/src/main/java/com/google/cloud/spanner/r2dbc/SpannerConnection.java b/src/main/java/com/google/cloud/spanner/r2dbc/SpannerConnection.java index 6df34b94..6baafe47 100644 --- a/src/main/java/com/google/cloud/spanner/r2dbc/SpannerConnection.java +++ b/src/main/java/com/google/cloud/spanner/r2dbc/SpannerConnection.java @@ -48,7 +48,7 @@ public class SpannerConnection implements Connection, StatementExecutionContext private final Client client; - private final Session session; + private Session session; private Transaction transaction; @@ -134,7 +134,10 @@ public Mono rollbackTransaction() { @Override public Mono close() { - return commitTransaction(false).then(this.client.deleteSession(this.getSessionName())); + return commitTransaction(false) + .then(this.client.deleteSession(this.getSessionName()).doOnSuccess(none -> { + this.session = null; + })); } @Override @@ -211,8 +214,11 @@ private void setTransaction( @Override public Publisher validate(ValidationDepth validationDepth) { - // TODO: https://github.com/GoogleCloudPlatform/cloud-spanner-r2dbc/issues/162 - return Mono.just(true); + if (validationDepth == ValidationDepth.LOCAL) { + return Mono.fromSupplier(() -> this.getSessionName() != null); + } + + return this.client.healthcheck(this); } diff --git a/src/main/java/com/google/cloud/spanner/r2dbc/client/Client.java b/src/main/java/com/google/cloud/spanner/r2dbc/client/Client.java index b13d5d5d..d1bcb418 100644 --- a/src/main/java/com/google/cloud/spanner/r2dbc/client/Client.java +++ b/src/main/java/com/google/cloud/spanner/r2dbc/client/Client.java @@ -141,4 +141,10 @@ Mono executeDdl( * @return a {@link Mono} that indicates that a client has been closed */ Mono close(); + + /** + * Validates session associated with the passed in {@link StatementExecutionContext}. + * @return {@link Mono} of whether the connection is working. + */ + Mono healthcheck(StatementExecutionContext ctx); } diff --git a/src/main/java/com/google/cloud/spanner/r2dbc/client/GrpcClient.java b/src/main/java/com/google/cloud/spanner/r2dbc/client/GrpcClient.java index 1c2d5a21..d9f00b2b 100644 --- a/src/main/java/com/google/cloud/spanner/r2dbc/client/GrpcClient.java +++ b/src/main/java/com/google/cloud/spanner/r2dbc/client/GrpcClient.java @@ -40,6 +40,7 @@ import com.google.spanner.v1.ExecuteBatchDmlRequest.Statement; import com.google.spanner.v1.ExecuteBatchDmlResponse; import com.google.spanner.v1.ExecuteSqlRequest; +import com.google.spanner.v1.ExecuteSqlRequest.Builder; import com.google.spanner.v1.PartialResultSet; import com.google.spanner.v1.ResultSet; import com.google.spanner.v1.RollbackRequest; @@ -61,6 +62,8 @@ import java.time.Duration; import java.util.List; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -81,13 +84,18 @@ public class GrpcClient implements Client { private static final String USER_AGENT_LIBRARY_NAME = "cloud-spanner-r2dbc"; + private static final String HEALTHCHECK_SQL = "SELECT 1"; + private static final int PORT = 443; + private static final Logger LOGGER = LoggerFactory.getLogger(GrpcClient.class); + private final ManagedChannel channel; private final SpannerStub spanner; private final DatabaseAdminStub databaseAdmin; private final OperationsStub operations; + /** * Initializes the Cloud Spanner gRPC async stub. * @@ -260,10 +268,7 @@ public Flux executeStreamingSql( return Flux.defer(() -> { Assert.requireNonNull(ctx.getSessionName(), "Session name must not be null"); - ExecuteSqlRequest.Builder executeSqlRequest = - ExecuteSqlRequest.newBuilder() - .setSql(sql) - .setSession(ctx.getSessionName()); + ExecuteSqlRequest.Builder executeSqlRequest = buildSqlRequest(ctx, sql); if (params != null) { executeSqlRequest @@ -331,6 +336,30 @@ public Mono close() { }); } + @Override + public Mono healthcheck(StatementExecutionContext ctx) { + return Mono.defer(() -> { + if (ctx.getSessionName() == null) { + return Mono.just(false); + } + + return ObservableReactiveUtil.unaryCall( + obs -> this.spanner.executeSql(buildSqlRequest(ctx, HEALTHCHECK_SQL).build(), obs) + ) + .map(rs -> Boolean.TRUE) + .onErrorResume(error -> { + this.LOGGER.warn("Cloud Spanner healthcheck failed", error); + return Mono.just(Boolean.FALSE); + }); + }); + } + + private Builder buildSqlRequest(StatementExecutionContext ctx, String sql) { + return ExecuteSqlRequest.newBuilder() + .setSql(sql) + .setSession(ctx.getSessionName()); + } + @VisibleForTesting public SpannerStub getSpanner() { return this.spanner; diff --git a/src/test/java/com/google/cloud/spanner/r2dbc/SpannerConnectionTest.java b/src/test/java/com/google/cloud/spanner/r2dbc/SpannerConnectionTest.java index 66a676d3..aa79683e 100644 --- a/src/test/java/com/google/cloud/spanner/r2dbc/SpannerConnectionTest.java +++ b/src/test/java/com/google/cloud/spanner/r2dbc/SpannerConnectionTest.java @@ -45,6 +45,7 @@ import com.google.spanner.v1.Type; import com.google.spanner.v1.TypeCode; import io.r2dbc.spi.Statement; +import io.r2dbc.spi.ValidationDepth; import java.util.Collections; import java.util.Map; import org.junit.Before; @@ -312,7 +313,6 @@ public void turningAutocommitOffIsNoopWhenAlreadyOff() { verifyZeroInteractions(this.mockClient); } - @Test public void turningAutocommitOffWorksLocally() { SpannerConnection connection = new SpannerConnection(this.mockClient, TEST_SESSION, null); @@ -350,20 +350,56 @@ public void turningAutocommitOnCommitsExistingTransaction() { @Test public void turningAutocommitOnDoesNotAffectNonReadwriteTransaction() { SpannerConnection connection = new SpannerConnection(this.mockClient, TEST_SESSION, null); - TransactionOptions readonlyTransaction = TransactionOptions.newBuilder() - .setReadOnly(ReadOnly.getDefaultInstance()).build(); + TransactionOptions readonlyTransaction = + TransactionOptions.newBuilder().setReadOnly(ReadOnly.getDefaultInstance()).build(); StepVerifier.create( - Mono.from(connection.setAutoCommit(false)) - .then(connection.beginTransaction(readonlyTransaction)) - .then(Mono.from(connection.setAutoCommit(true))) - ).verifyComplete(); + Mono.from(connection.setAutoCommit(false)) + .then(connection.beginTransaction(readonlyTransaction)) + .then(Mono.from(connection.setAutoCommit(true)))) + .verifyComplete(); verify(this.mockClient).beginTransaction(TEST_SESSION_NAME, readonlyTransaction); verify(this.mockClient, times(0)).commitTransaction(eq(TEST_SESSION_NAME), any()); assertThat(connection.isAutoCommit()).isTrue(); } + @Test + public void localValidatePassesOnNewConnection() { + SpannerConnection connection = new SpannerConnection(this.mockClient, TEST_SESSION, null); + StepVerifier.create(connection.validate(ValidationDepth.LOCAL)) + .expectNext(true) + .verifyComplete(); + verifyZeroInteractions(this.mockClient); + } + + @Test + public void localValidateFailsOnClosedConnection() { + when(this.mockClient.commitTransaction(any(), any())) + .thenReturn(Mono.just(CommitResponse.getDefaultInstance())); + when(this.mockClient.deleteSession(any())).thenReturn(Mono.empty()); + + SpannerConnection connection = new SpannerConnection(this.mockClient, TEST_SESSION, null); + + StepVerifier.create( + connection.close() + .then(Mono.from(connection.validate(ValidationDepth.LOCAL)))) + .expectNext(false) + .verifyComplete(); + verify(this.mockClient, times(0)).healthcheck(any()); + } + + @Test + public void remoteValidateCallsServerHealthcheck() { + when(this.mockClient.healthcheck(any())).thenReturn(Mono.just(true)); + + SpannerConnection connection = new SpannerConnection(this.mockClient, TEST_SESSION, null); + StepVerifier.create(connection.validate(ValidationDepth.REMOTE)) + .expectNext(true) + .verifyComplete(); + verify(this.mockClient).healthcheck(connection); + } + private PartialResultSet makeBookPrs(String bookName) { return PartialResultSet.newBuilder() .setMetadata(ResultSetMetadata.newBuilder().setRowType(StructType.newBuilder() diff --git a/src/test/java/com/google/cloud/spanner/r2dbc/client/GrpcClientTest.java b/src/test/java/com/google/cloud/spanner/r2dbc/client/GrpcClientTest.java index ddb3b542..62ef45a1 100644 --- a/src/test/java/com/google/cloud/spanner/r2dbc/client/GrpcClientTest.java +++ b/src/test/java/com/google/cloud/spanner/r2dbc/client/GrpcClientTest.java @@ -182,6 +182,30 @@ public void testUserAgentConfig() (String) userAgentField.get(deletegateField.get(channel)))); } + @Test + public void testHealthcheck() throws IOException { + String sql = "SELECT 1"; + SpannerImplBase spannerSpy = doTest(new SpannerImplBase() { + @Override + public void executeSql(ExecuteSqlRequest request, + StreamObserver responseObserver) { + responseObserver.onNext(ResultSet.newBuilder().build()); + responseObserver.onCompleted(); + } + }, + // call the method under test + grpcClient -> grpcClient.healthcheck(this.mockContext).block()); + + // verify the service was called correctly + ArgumentCaptor requestCaptor = ArgumentCaptor + .forClass(ExecuteSqlRequest.class); + verify(spannerSpy).executeSql(requestCaptor.capture(), any()); + assertEquals(sql, requestCaptor.getValue().getSql()); + assertEquals(SESSION_NAME, requestCaptor.getValue().getSession()); + assertEquals(ByteString.EMPTY, requestCaptor.getValue().getTransaction().getId()); + + } + /** * Starts and shuts down an in-process gRPC service based on the {@code serviceImpl} provided, * while allowing a test to execute using the {@link GrpcClient}. diff --git a/src/test/java/com/google/cloud/spanner/r2dbc/it/SpannerExample.java b/src/test/java/com/google/cloud/spanner/r2dbc/it/SpannerExample.java index d012b373..15903c88 100644 --- a/src/test/java/com/google/cloud/spanner/r2dbc/it/SpannerExample.java +++ b/src/test/java/com/google/cloud/spanner/r2dbc/it/SpannerExample.java @@ -31,8 +31,6 @@ import com.google.cloud.spanner.DatabaseId; import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerOptions; -import io.r2dbc.pool.ConnectionPool; -import io.r2dbc.pool.ConnectionPoolConfiguration; import io.r2dbc.spi.Connection; import io.r2dbc.spi.ConnectionFactories; import io.r2dbc.spi.ConnectionFactory; @@ -41,7 +39,6 @@ import io.r2dbc.spi.Statement; import io.r2dbc.spi.test.TestKit; import java.nio.charset.StandardCharsets; -import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -72,13 +69,6 @@ public class SpannerExample implements TestKit { .option(DATABASE, TEST_DATABASE) .build()); - private static final ConnectionPool pool = - new ConnectionPool(ConnectionPoolConfiguration.builder(connectionFactory) - .validationQuery("SELECT 1") - .maxIdleTime(Duration.ofSeconds(10)) - .maxSize(15) - .build()); - private static final Logger logger = LoggerFactory.getLogger(SpannerExample.class); private static final JdbcOperations jdbcOperations; @@ -130,12 +120,14 @@ private static void runDdl(DatabaseId id, DatabaseAdminClient dbAdminClient, Str Collections.singletonList(query), null).get(); } catch (Exception e) { - logger.info("Couldn't run DDL", e); + if (!e.getMessage().contains("Duplicate name in schema")) { + logger.info("Couldn't run DDL", e); + } } } private static void executeDml(Function statementFunc) { - Mono.from(pool.create()) + Mono.from(connectionFactory.create()) .delayUntil(c -> c.beginTransaction()) .delayUntil(c -> Flux.from(statementFunc.apply(c).execute()) .flatMapSequential(r -> Mono.from(r.getRowsUpdated()))) @@ -152,7 +144,7 @@ private static Mono close(Connection connection) { @Override public ConnectionFactory getConnectionFactory() { - return pool; + return connectionFactory; } // we don't need to create tables because it is slow. we do it upfront.