|
21 | 21 | import io.netty.util.ReferenceCountUtil;
|
22 | 22 | import io.r2dbc.postgresql.PostgresqlConnectionConfiguration;
|
23 | 23 | import io.r2dbc.postgresql.PostgresqlConnectionFactory;
|
| 24 | +import io.r2dbc.postgresql.api.ErrorDetails; |
24 | 25 | import io.r2dbc.postgresql.api.PostgresqlConnection;
|
| 26 | +import io.r2dbc.postgresql.api.PostgresqlException; |
25 | 27 | import io.r2dbc.postgresql.authentication.PasswordAuthenticationHandler;
|
26 | 28 | import io.r2dbc.postgresql.message.Format;
|
27 | 29 | import io.r2dbc.postgresql.message.backend.BackendMessage;
|
|
37 | 39 | import io.r2dbc.postgresql.message.frontend.FrontendMessage;
|
38 | 40 | import io.r2dbc.postgresql.message.frontend.Query;
|
39 | 41 | import io.r2dbc.postgresql.message.frontend.Sync;
|
| 42 | +import io.r2dbc.postgresql.util.PgBouncer; |
40 | 43 | import io.r2dbc.postgresql.util.PostgresqlServerExtension;
|
| 44 | +import io.r2dbc.spi.R2dbcBadGrammarException; |
41 | 45 | import io.r2dbc.spi.R2dbcNonTransientResourceException;
|
42 | 46 | import io.r2dbc.spi.R2dbcPermissionDeniedException;
|
43 | 47 | import org.junit.jupiter.api.AfterEach;
|
@@ -394,29 +398,77 @@ public boolean verify(String s, SSLSession sslSession) {
|
394 | 398 |
|
395 | 399 | @Nested
|
396 | 400 | @TestInstance(TestInstance.Lifecycle.PER_CLASS)
|
397 |
| - final class StatementCacheSizeTests { |
| 401 | + final class PgBouncerTests { |
398 | 402 |
|
399 | 403 | @ParameterizedTest
|
400 |
| - @ValueSource(ints = {0, 2, -1}) |
401 |
| - void multiplePreparedStatementsTest(int statementCacheSize) { |
402 |
| - PostgresqlConnectionFactory connectionFactory = this.createConnectionFactory(statementCacheSize); |
| 404 | + @ValueSource(strings = {"transaction", "statement"}) |
| 405 | + void disabledCacheWorksWithTransactionAndStatementModes(String poolMode) { |
| 406 | + try (PgBouncer pgBouncer = new PgBouncer(SERVER, poolMode)) { |
| 407 | + PostgresqlConnectionFactory connectionFactory = this.createConnectionFactory(pgBouncer, 0); |
| 408 | + |
| 409 | + connectionFactory.create().flatMapMany(connection -> { |
| 410 | + Flux<Integer> q1 = connection.createStatement("SELECT 1 WHERE $1 = 1").bind(0, 1).execute().flatMap(r -> r.map((row, rowMetadata) -> row.get(0, Integer.class))); |
| 411 | + Flux<Integer> q2 = connection.createStatement("SELECT 2 WHERE $1 = 2").bind(0, 2).execute().flatMap(r -> r.map((row, rowMetadata) -> row.get(0, Integer.class))); |
| 412 | + Flux<Integer> q3 = connection.createStatement("SELECT 3 WHERE $1 = 3").bind(0, 3).execute().flatMap(r -> r.map((row, rowMetadata) -> row.get(0, Integer.class))); |
| 413 | + |
| 414 | + return Flux.concat(q1, q1, q2, q2, q3, q3, connection.close()); |
| 415 | + }) |
| 416 | + .as(StepVerifier::create) |
| 417 | + .expectNext(1, 1, 2, 2, 3, 3) |
| 418 | + .verifyComplete(); |
| 419 | + } |
| 420 | + } |
403 | 421 |
|
404 |
| - connectionFactory.create().flatMapMany(connection -> { |
405 |
| - Flux<Integer> firstQuery = connection.createStatement("SELECT 1 WHERE $1 = 1").bind(0, 1).execute().flatMap(r -> r.map((row, rowMetadata) -> row.get(0, Integer.class))); |
406 |
| - Flux<Integer> secondQuery = connection.createStatement("SELECT 2 WHERE $1 = 2").bind(0, 2).execute().flatMap(r -> r.map((row, rowMetadata) -> row.get(0, Integer.class))); |
407 |
| - Flux<Integer> thirdQuery = connection.createStatement("SELECT 3 WHERE $1 = 3").bind(0, 3).execute().flatMap(r -> r.map((row, rowMetadata) -> row.get(0, Integer.class))); |
| 422 | + @ParameterizedTest |
| 423 | + @ValueSource(ints = {-1, 0, 2}) |
| 424 | + void sessionModeWorksWithAllCaches(int statementCacheSize) { |
| 425 | + try (PgBouncer pgBouncer = new PgBouncer(SERVER, "session")) { |
| 426 | + PostgresqlConnectionFactory connectionFactory = this.createConnectionFactory(pgBouncer, statementCacheSize); |
| 427 | + |
| 428 | + connectionFactory.create().flatMapMany(connection -> { |
| 429 | + Flux<Integer> q1 = connection.createStatement("SELECT 1 WHERE $1 = 1").bind(0, 1).execute().flatMap(r -> r.map((row, rowMetadata) -> row.get(0, Integer.class))); |
| 430 | + Flux<Integer> q2 = connection.createStatement("SELECT 2 WHERE $1 = 2").bind(0, 2).execute().flatMap(r -> r.map((row, rowMetadata) -> row.get(0, Integer.class))); |
| 431 | + Flux<Integer> q3 = connection.createStatement("SELECT 3 WHERE $1 = 3").bind(0, 3).execute().flatMap(r -> r.map((row, rowMetadata) -> row.get(0, Integer.class))); |
| 432 | + |
| 433 | + return Flux.concat(q1, q1, q2, q2, q3, q3, connection.close()); |
| 434 | + }) |
| 435 | + .as(StepVerifier::create) |
| 436 | + .expectNext(1, 1, 2, 2, 3, 3) |
| 437 | + .verifyComplete(); |
| 438 | + } |
| 439 | + } |
408 | 440 |
|
409 |
| - return Flux.concat(firstQuery, secondQuery, thirdQuery, connection.close()); |
410 |
| - }) |
411 |
| - .as(StepVerifier::create) |
412 |
| - .expectNext(1, 2, 3) |
413 |
| - .verifyComplete(); |
| 441 | + @ParameterizedTest |
| 442 | + @ValueSource(strings = {"transaction", "statement"}) |
| 443 | + void statementCacheDoesntWorkWithTransactionAndStatementModes(String poolMode) { |
| 444 | + try (PgBouncer pgBouncer = new PgBouncer(SERVER, poolMode)) { |
| 445 | + PostgresqlConnectionFactory connectionFactory = this.createConnectionFactory(pgBouncer, -1); |
| 446 | + |
| 447 | + connectionFactory.create().flatMapMany(connection -> { |
| 448 | + Flux<Integer> q1 = connection.createStatement("SELECT 1 WHERE $1 = 1").bind(0, 1).execute().flatMap(r -> r.map((row, rowMetadata) -> row.get(0, Integer.class))); |
| 449 | + |
| 450 | + return Flux.concat(q1, q1, connection.close()); |
| 451 | + }) |
| 452 | + .as(StepVerifier::create) |
| 453 | + .expectNext(1) |
| 454 | + .verifyErrorMatches(e -> { |
| 455 | + if (!(e instanceof R2dbcBadGrammarException)) { |
| 456 | + return false; |
| 457 | + } |
| 458 | + if (!(e instanceof PostgresqlException)) { |
| 459 | + return false; |
| 460 | + } |
| 461 | + PostgresqlException pgException = (PostgresqlException) e; |
| 462 | + ErrorDetails errorDetails = pgException.getErrorDetails(); |
| 463 | + return errorDetails.getCode().equals("26000") && errorDetails.getMessage().equals("prepared statement \"S_0\" does not exist"); |
| 464 | + }); |
| 465 | + } |
414 | 466 | }
|
415 | 467 |
|
416 |
| - private PostgresqlConnectionFactory createConnectionFactory(int statementCacheSize) { |
| 468 | + private PostgresqlConnectionFactory createConnectionFactory(PgBouncer pgBouncer, int statementCacheSize) { |
417 | 469 | return new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
|
418 |
| - .host(SERVER.getHost()) |
419 |
| - .port(SERVER.getPort()) |
| 470 | + .host(pgBouncer.getHost()) |
| 471 | + .port(pgBouncer.getPort()) |
420 | 472 | .username(SERVER.getUsername())
|
421 | 473 | .password(SERVER.getPassword())
|
422 | 474 | .database(SERVER.getDatabase())
|
|
0 commit comments