Skip to content

feat: add option for multiplexed sessions with partitioned operations #3635

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Feb 20, 2025
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ jobs:
env:
JOB_TYPE: test
SPANNER_EMULATOR_HOST: localhost:9010
GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS: true
GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS: true
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ public boolean getUseMultiplexedSessionForRW() {
@VisibleForTesting
@InternalApi
public boolean getUseMultiplexedSessionPartitionedOps() {
return useMultiplexedSessionForPartitionedOps;
return getUseMultiplexedSession() && useMultiplexedSessionForPartitionedOps;
}

private static Boolean getUseMultiplexedSessionFromEnvVariable() {
Expand All @@ -370,9 +370,7 @@ private static Boolean getUseMultiplexedSessionFromEnvVariable() {
@VisibleForTesting
@InternalApi
protected static Boolean getUseMultiplexedSessionFromEnvVariablePartitionedOps() {
// Checks the value of env, GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS
// This returns null until Partitioned Operations is supported.
return null;
return parseBooleanEnvVariable("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS");
}

private static Boolean parseBooleanEnvVariable(String variableName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public void setUp() {
@SuppressWarnings("resource")
SpannerImpl spanner = new SpannerImpl(gapicRpc, spannerOptions);
client = new BatchClientImpl(spanner.getSessionClient(db), isMultiplexedSession);
BatchClientImpl.unimplementedForPartitionedOps.set(false);
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,13 @@ boolean isMultiplexedSessionsEnabled(Spanner spanner) {
return spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSession();
}

boolean isMultiplexedSessionsEnabledForPartitionedOps(Spanner spanner) {
if (spanner.getOptions() == null || spanner.getOptions().getSessionPoolOptions() == null) {
return false;
}
return spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionPartitionedOps();
}

boolean isMultiplexedSessionsEnabledForRW(Spanner spanner) {
if (spanner.getOptions() == null || spanner.getOptions().getSessionPoolOptions() == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ public void testPartitionQuery() {
assertFalse(resultSet.next());
}
}
if (isMultiplexedSessionsEnabled(connection.getSpanner())) {
if (isMultiplexedSessionsEnabledForPartitionedOps(connection.getSpanner())) {
assertEquals(2, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
} else if (isMultiplexedSessionsEnabled(connection.getSpanner())) {
assertEquals(3, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
} else {
assertEquals(2, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
Expand Down Expand Up @@ -155,7 +157,9 @@ public void testMixNormalAndPartitionQueryInReadOnlyTransaction() {
readTimestamps.add(connection.getReadTimestamp());
connection.commit();
}
if (isMultiplexedSessionsEnabled(connection.getSpanner())) {
if (isMultiplexedSessionsEnabledForPartitionedOps(connection.getSpanner())) {
assertEquals(2, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
} else if (isMultiplexedSessionsEnabled(connection.getSpanner())) {
assertEquals(3, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
} else {
assertEquals(2, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
Expand Down Expand Up @@ -228,6 +232,10 @@ public void testRunPartition() {
if (createSessionRequestCounts == expectedCreateSessionsRPC + 1) {
isMultiplexedSessionCreated = true;
}
} else if (isMultiplexedSessionsEnabledForPartitionedOps(connection.getSpanner())
&& isMultiplexedSessionCreated) {
// When multiplexed session will be reused for each iteration.
assertEquals(0, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
} else {
assertEquals(
expectedCreateSessionsRPC,
Expand Down Expand Up @@ -261,6 +269,7 @@ public void testRunPartitionUsingSql() {
String prefix = dialect == Dialect.POSTGRESQL ? "spanner." : "";

int maxPartitions = 5;
boolean isMultiplexedSessionCreated = false;
try (Connection connection = createConnection()) {
connection.execute(Statement.of("set autocommit=true"));
assertTrue(connection.isAutocommit());
Expand All @@ -284,7 +293,6 @@ public void testRunPartitionUsingSql() {
assertFalse(resultSet.next());
}

boolean isMultiplexedSessionCreated = false;
for (boolean useLiteral : new boolean[] {true, false}) {
try (ResultSet partitions =
connection.executeQuery(
Expand Down Expand Up @@ -328,6 +336,10 @@ public void testRunPartitionUsingSql() {
if (createSessionRequestCounts == expectedCreateSessionsRPC + 1) {
isMultiplexedSessionCreated = true;
}
} else if (isMultiplexedSessionsEnabledForPartitionedOps(connection.getSpanner())
&& isMultiplexedSessionCreated) {
// When multiplexed session will be reused for each iteration.
assertEquals(0, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
} else {
assertEquals(
expectedCreateSessionsRPC,
Expand Down Expand Up @@ -570,7 +582,9 @@ public void testRunPartitionedQueryUsingSql() {
assertEquals(maxPartitions * generatedRowCount, rowCount);
}
}
if (isMultiplexedSessionsEnabled(connection.getSpanner())) {
if (isMultiplexedSessionsEnabledForPartitionedOps(connection.getSpanner())) {
assertEquals(2, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
} else if (isMultiplexedSessionsEnabled(connection.getSpanner())) {
assertEquals(3, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
} else {
assertEquals(2, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
Expand Down Expand Up @@ -679,7 +693,9 @@ public void testRunPartitionedQueryWithMaxParallelism() {
assertEquals(maxPartitions * generatedRowCount, rowCount);
}
}
if (isMultiplexedSessionsEnabled(connection.getSpanner())) {
if (isMultiplexedSessionsEnabledForPartitionedOps(connection.getSpanner())) {
assertEquals(2, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
} else if (isMultiplexedSessionsEnabled(connection.getSpanner())) {
assertEquals(6, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
} else {
assertEquals(5, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
Expand Down Expand Up @@ -758,7 +774,10 @@ public void testAutoPartitionMode() {
exception
.getMessage()
.contains("Partition query is not supported for read/write transaction"));
if (isMultiplexedSessionsEnabled(connection.getSpanner())) {

if (isMultiplexedSessionsEnabledForPartitionedOps(connection.getSpanner())) {
assertEquals(2, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
} else if (isMultiplexedSessionsEnabled(connection.getSpanner())) {
assertEquals(3, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
} else {
assertEquals(2, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
Expand Down
Loading