Skip to content

Implement connection validation #173

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class SpannerConnection implements Connection, StatementExecutionContext

private final Client client;

private final Session session;
private Session session;

private Transaction transaction;

Expand Down Expand Up @@ -134,7 +134,10 @@ public Mono<Void> rollbackTransaction() {

@Override
public Mono<Void> close() {
return commitTransaction(false).then(this.client.deleteSession(this.getSessionName()));
return commitTransaction(false)
.then(this.client.deleteSession(this.getSessionName()).doOnSuccess(none -> {
this.session = null;
}));
}

@Override
Expand Down Expand Up @@ -211,8 +214,11 @@ private void setTransaction(

@Override
public Publisher<Boolean> validate(ValidationDepth validationDepth) {
// TODO: https://github.com/GoogleCloudPlatform/cloud-spanner-r2dbc/issues/162
return Mono.just(true);
if (validationDepth == ValidationDepth.LOCAL) {
return Mono.fromSupplier(() -> this.getSessionName() != null);
}

return this.client.healthcheck(this);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,10 @@ Mono<Operation> executeDdl(
* @return a {@link Mono} that indicates that a client has been closed
*/
Mono<Void> close();

/**
* Validates session associated with the passed in {@link StatementExecutionContext}.
* @return {@link Mono} of whether the connection is working.
*/
Mono<Boolean> healthcheck(StatementExecutionContext ctx);
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.google.spanner.v1.ExecuteBatchDmlRequest.Statement;
import com.google.spanner.v1.ExecuteBatchDmlResponse;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.Builder;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.ResultSet;
import com.google.spanner.v1.RollbackRequest;
Expand All @@ -61,6 +62,8 @@
import java.time.Duration;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -81,13 +84,18 @@ public class GrpcClient implements Client {

private static final String USER_AGENT_LIBRARY_NAME = "cloud-spanner-r2dbc";

private static final String HEALTHCHECK_SQL = "SELECT 1";

private static final int PORT = 443;

private static final Logger LOGGER = LoggerFactory.getLogger(GrpcClient.class);

private final ManagedChannel channel;
private final SpannerStub spanner;
private final DatabaseAdminStub databaseAdmin;
private final OperationsStub operations;


/**
* Initializes the Cloud Spanner gRPC async stub.
*
Expand Down Expand Up @@ -260,10 +268,7 @@ public Flux<PartialResultSet> executeStreamingSql(
return Flux.defer(() -> {
Assert.requireNonNull(ctx.getSessionName(), "Session name must not be null");

ExecuteSqlRequest.Builder executeSqlRequest =
ExecuteSqlRequest.newBuilder()
.setSql(sql)
.setSession(ctx.getSessionName());
ExecuteSqlRequest.Builder executeSqlRequest = buildSqlRequest(ctx, sql);

if (params != null) {
executeSqlRequest
Expand Down Expand Up @@ -331,6 +336,30 @@ public Mono<Void> close() {
});
}

@Override
public Mono<Boolean> healthcheck(StatementExecutionContext ctx) {
return Mono.defer(() -> {
if (ctx.getSessionName() == null) {
return Mono.just(false);
}

return ObservableReactiveUtil.<ResultSet>unaryCall(
obs -> this.spanner.executeSql(buildSqlRequest(ctx, HEALTHCHECK_SQL).build(), obs)
)
.map(rs -> Boolean.TRUE)
.onErrorResume(error -> {
this.LOGGER.warn("Cloud Spanner healthcheck failed", error);
return Mono.just(Boolean.FALSE);
});
});
}

private Builder buildSqlRequest(StatementExecutionContext ctx, String sql) {
return ExecuteSqlRequest.newBuilder()
.setSql(sql)
.setSession(ctx.getSessionName());
}

@VisibleForTesting
public SpannerStub getSpanner() {
return this.spanner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.google.spanner.v1.Type;
import com.google.spanner.v1.TypeCode;
import io.r2dbc.spi.Statement;
import io.r2dbc.spi.ValidationDepth;
import java.util.Collections;
import java.util.Map;
import org.junit.Before;
Expand Down Expand Up @@ -312,7 +313,6 @@ public void turningAutocommitOffIsNoopWhenAlreadyOff() {
verifyZeroInteractions(this.mockClient);
}


@Test
public void turningAutocommitOffWorksLocally() {
SpannerConnection connection = new SpannerConnection(this.mockClient, TEST_SESSION, null);
Expand Down Expand Up @@ -350,20 +350,56 @@ public void turningAutocommitOnCommitsExistingTransaction() {
@Test
public void turningAutocommitOnDoesNotAffectNonReadwriteTransaction() {
SpannerConnection connection = new SpannerConnection(this.mockClient, TEST_SESSION, null);
TransactionOptions readonlyTransaction = TransactionOptions.newBuilder()
.setReadOnly(ReadOnly.getDefaultInstance()).build();
TransactionOptions readonlyTransaction =
TransactionOptions.newBuilder().setReadOnly(ReadOnly.getDefaultInstance()).build();

StepVerifier.create(
Mono.from(connection.setAutoCommit(false))
.then(connection.beginTransaction(readonlyTransaction))
.then(Mono.from(connection.setAutoCommit(true)))
).verifyComplete();
Mono.from(connection.setAutoCommit(false))
.then(connection.beginTransaction(readonlyTransaction))
.then(Mono.from(connection.setAutoCommit(true))))
.verifyComplete();

verify(this.mockClient).beginTransaction(TEST_SESSION_NAME, readonlyTransaction);
verify(this.mockClient, times(0)).commitTransaction(eq(TEST_SESSION_NAME), any());
assertThat(connection.isAutoCommit()).isTrue();
}

@Test
public void localValidatePassesOnNewConnection() {
SpannerConnection connection = new SpannerConnection(this.mockClient, TEST_SESSION, null);
StepVerifier.create(connection.validate(ValidationDepth.LOCAL))
.expectNext(true)
.verifyComplete();
verifyZeroInteractions(this.mockClient);
}

@Test
public void localValidateFailsOnClosedConnection() {
when(this.mockClient.commitTransaction(any(), any()))
.thenReturn(Mono.just(CommitResponse.getDefaultInstance()));
when(this.mockClient.deleteSession(any())).thenReturn(Mono.empty());

SpannerConnection connection = new SpannerConnection(this.mockClient, TEST_SESSION, null);

StepVerifier.create(
connection.close()
.then(Mono.from(connection.validate(ValidationDepth.LOCAL))))
.expectNext(false)
.verifyComplete();
verify(this.mockClient, times(0)).healthcheck(any());
}

@Test
public void remoteValidateCallsServerHealthcheck() {
when(this.mockClient.healthcheck(any())).thenReturn(Mono.just(true));

SpannerConnection connection = new SpannerConnection(this.mockClient, TEST_SESSION, null);
StepVerifier.create(connection.validate(ValidationDepth.REMOTE))
.expectNext(true)
.verifyComplete();
verify(this.mockClient).healthcheck(connection);
}

private PartialResultSet makeBookPrs(String bookName) {
return PartialResultSet.newBuilder()
.setMetadata(ResultSetMetadata.newBuilder().setRowType(StructType.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,30 @@ public void testUserAgentConfig()
(String) userAgentField.get(deletegateField.get(channel))));
}

@Test
public void testHealthcheck() throws IOException {
String sql = "SELECT 1";
SpannerImplBase spannerSpy = doTest(new SpannerImplBase() {
@Override
public void executeSql(ExecuteSqlRequest request,
StreamObserver<ResultSet> responseObserver) {
responseObserver.onNext(ResultSet.newBuilder().build());
responseObserver.onCompleted();
}
},
// call the method under test
grpcClient -> grpcClient.healthcheck(this.mockContext).block());

// verify the service was called correctly
ArgumentCaptor<ExecuteSqlRequest> requestCaptor = ArgumentCaptor
.forClass(ExecuteSqlRequest.class);
verify(spannerSpy).executeSql(requestCaptor.capture(), any());
assertEquals(sql, requestCaptor.getValue().getSql());
assertEquals(SESSION_NAME, requestCaptor.getValue().getSession());
assertEquals(ByteString.EMPTY, requestCaptor.getValue().getTransaction().getId());

}

/**
* Starts and shuts down an in-process gRPC service based on the {@code serviceImpl} provided,
* while allowing a test to execute using the {@link GrpcClient}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import io.r2dbc.pool.ConnectionPool;
import io.r2dbc.pool.ConnectionPoolConfiguration;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactory;
Expand All @@ -41,7 +39,6 @@
import io.r2dbc.spi.Statement;
import io.r2dbc.spi.test.TestKit;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -72,13 +69,6 @@ public class SpannerExample implements TestKit<String> {
.option(DATABASE, TEST_DATABASE)
.build());

private static final ConnectionPool pool =
new ConnectionPool(ConnectionPoolConfiguration.builder(connectionFactory)
.validationQuery("SELECT 1")
.maxIdleTime(Duration.ofSeconds(10))
.maxSize(15)
.build());

private static final Logger logger = LoggerFactory.getLogger(SpannerExample.class);

private static final JdbcOperations jdbcOperations;
Expand Down Expand Up @@ -130,12 +120,14 @@ private static void runDdl(DatabaseId id, DatabaseAdminClient dbAdminClient, Str
Collections.singletonList(query),
null).get();
} catch (Exception e) {
logger.info("Couldn't run DDL", e);
if (!e.getMessage().contains("Duplicate name in schema")) {
logger.info("Couldn't run DDL", e);
}
}
}

private static void executeDml(Function<Connection, Statement> statementFunc) {
Mono.from(pool.create())
Mono.from(connectionFactory.create())
.delayUntil(c -> c.beginTransaction())
.delayUntil(c -> Flux.from(statementFunc.apply(c).execute())
.flatMapSequential(r -> Mono.from(r.getRowsUpdated())))
Expand All @@ -152,7 +144,7 @@ private static <T> Mono<T> close(Connection connection) {

@Override
public ConnectionFactory getConnectionFactory() {
return pool;
return connectionFactory;
}

// we don't need to create tables because it is slow. we do it upfront.
Expand Down