Skip to content

Commit 85c7b62

Browse files
committed
Use detached buffers for Json decoding by default
JsonCodec now copies the decoded value from an attached buffer to a detached one reduce the risk of memory leaks. Detached buffers get garbage collected regardless of whether the decoded value gets consumed. Attached buffers are reference-counted and don't get automatically garbage-collected which requires decoded values to be consumed. If a value backed by an attached buffer doesn't get consumed, then the memory backing the data structure isn't reclaimed and creates a memory leak. Since this behavior is non-obvious, we default to the less-efficient approach to copy buffers which prevents the risk of memory leaks. [resolves #330]
1 parent 4af0379 commit 85c7b62

File tree

9 files changed

+124
-22
lines changed

9 files changed

+124
-22
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ Mono<Connection> connectionMono = Mono.from(connectionFactory.create());
8080
| `fetchSize` | The default number of rows to return when fetching results. Defaults to `0` for unlimited. _(Optional)_
8181
| `forceBinary` | Whether to force binary transfer. Defaults to `false`. _(Optional)_
8282
| `loopResources` | TCP/Socket LoopResources (depends on the endpoint connection type). _(Optional)_
83+
| `preferAttachedBuffers` |Configure whether codecs should prefer attached data buffers. The default is `false`, meaning that codecs will copy data from the input buffer into a byte array. Enabling attached buffers requires consumption of values such as `Json` to avoid memory leaks.
8384
| `preparedStatementCacheQueries` | Determine the number of queries that are cached in each connection. The default is `-1`, meaning there's no limit. The value of `0` disables the cache. Any other value specifies the cache size.
8485
| `options` | A `Map<String, String>` of connection parameters. These are applied to each database connection created by the `ConnectionFactory`. Useful for setting generic [PostgreSQL connection parameters][psql-runtime-config]. _(Optional)_
8586
| `schema` | The search path to set. _(Optional)_

src/main/java/io/r2dbc/postgresql/PostgresqlConnectionConfiguration.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import io.r2dbc.postgresql.client.SSLConfig;
2323
import io.r2dbc.postgresql.client.SSLMode;
2424
import io.r2dbc.postgresql.codec.Codec;
25+
import io.r2dbc.postgresql.codec.Codecs;
26+
import io.r2dbc.postgresql.codec.Json;
2527
import io.r2dbc.postgresql.extension.CodecRegistrar;
2628
import io.r2dbc.postgresql.extension.Extension;
2729
import io.r2dbc.postgresql.util.Assert;
@@ -81,6 +83,8 @@ public final class PostgresqlConnectionConfiguration {
8183

8284
private final int port;
8385

86+
private final boolean preferAttachedBuffers;
87+
8488
private final int preparedStatementCacheQueries;
8589

8690
private final String socket;
@@ -95,7 +99,7 @@ public final class PostgresqlConnectionConfiguration {
9599

96100
private PostgresqlConnectionConfiguration(String applicationName, boolean autodetectExtensions, @Nullable Duration connectTimeout, @Nullable String database, List<Extension> extensions,
97101
ToIntFunction<String> fetchSize, boolean forceBinary, @Nullable String host, @Nullable LoopResources loopResources,
98-
@Nullable Map<String, String> options, @Nullable CharSequence password, int port, int preparedStatementCacheQueries, @Nullable String schema,
102+
@Nullable Map<String, String> options, @Nullable CharSequence password, int port,boolean preferAttachedBuffers, int preparedStatementCacheQueries, @Nullable String schema,
99103
@Nullable String socket, SSLConfig sslConfig, boolean tcpKeepAlive, boolean tcpNoDelay, String username) {
100104
this.applicationName = Assert.requireNonNull(applicationName, "applicationName must not be null");
101105
this.autodetectExtensions = autodetectExtensions;
@@ -114,6 +118,7 @@ private PostgresqlConnectionConfiguration(String applicationName, boolean autode
114118

115119
this.password = password;
116120
this.port = port;
121+
this.preferAttachedBuffers = preferAttachedBuffers;
117122
this.preparedStatementCacheQueries = preparedStatementCacheQueries;
118123
this.socket = socket;
119124
this.sslConfig = sslConfig;
@@ -146,6 +151,7 @@ public String toString() {
146151
", options='" + this.options + '\'' +
147152
", password='" + obfuscate(this.password != null ? this.password.length() : 0) + '\'' +
148153
", port=" + this.port +
154+
", preferAttachedBuffers=" + this.preferAttachedBuffers +
149155
", socket=" + this.socket +
150156
", tcpKeepAlive=" + this.tcpKeepAlive +
151157
", tcpNoDelay=" + this.tcpNoDelay +
@@ -213,6 +219,10 @@ int getPort() {
213219
return this.port;
214220
}
215221

222+
boolean isPreferAttachedBuffers() {
223+
return this.preferAttachedBuffers;
224+
}
225+
216226
int getPreparedStatementCacheQueries() {
217227
return this.preparedStatementCacheQueries;
218228
}
@@ -305,6 +315,8 @@ public static final class Builder {
305315

306316
private int port = DEFAULT_PORT;
307317

318+
private boolean preferAttachedBuffers = false;
319+
308320
private int preparedStatementCacheQueries = -1;
309321

310322
@Nullable
@@ -387,7 +399,8 @@ public PostgresqlConnectionConfiguration build() {
387399
}
388400

389401
return new PostgresqlConnectionConfiguration(this.applicationName, this.autodetectExtensions, this.connectTimeout, this.database, this.extensions, this.fetchSize, this.forceBinary,
390-
this.host, this.loopResources, this.options, this.password, this.port, this.preparedStatementCacheQueries, this.schema, this.socket, this.createSslConfig(), this.tcpKeepAlive,
402+
this.host, this.loopResources, this.options, this.password, this.port, this.preferAttachedBuffers,
403+
this.preparedStatementCacheQueries, this.schema, this.socket, this.createSslConfig(), this.tcpKeepAlive,
391404
this.tcpNoDelay, this.username);
392405
}
393406

@@ -552,6 +565,20 @@ public Builder port(int port) {
552565
return this;
553566
}
554567

568+
/**
569+
* Configure whether {@link Codecs codecs} should prefer attached data buffers. The default is {@code false}, meaning that codecs will copy data from the input buffer into a {@code byte[]}
570+
* or similar data structure that is enabled for garbage collection. Using attached buffers is more efficient but comes with the requirement that decoded values (such as {@link Json}) must
571+
* be consumed to release attached buffers to avoid memory leaks.
572+
*
573+
* @param preferAttachedBuffers the flag whether to prefer attached buffers
574+
* @return this {@link Builder}
575+
* @since 0.8.5
576+
*/
577+
public Builder preferAttachedBuffers(boolean preferAttachedBuffers) {
578+
this.preferAttachedBuffers = preferAttachedBuffers;
579+
return this;
580+
}
581+
555582
/**
556583
* Configure the preparedStatementCacheQueries. The default is {@code -1}, meaning there's no limit. The value of {@code 0} disables the cache. Any other value specifies the cache size.
557584
*

src/main/java/io/r2dbc/postgresql/PostgresqlConnectionFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ private Mono<PostgresqlConnection> doCreateConnection(boolean forReplication, @N
160160
)
161161
.flatMap(client -> {
162162

163-
DefaultCodecs codecs = new DefaultCodecs(client.getByteBufAllocator());
163+
DefaultCodecs codecs = new DefaultCodecs(client.getByteBufAllocator(), this.configuration.isPreferAttachedBuffers());
164164
StatementCache statementCache = StatementCache.fromPreparedStatementCacheQueries(client, this.configuration.getPreparedStatementCacheQueries());
165165

166166
// early connection object to retrieve initialization details

src/main/java/io/r2dbc/postgresql/PostgresqlConnectionFactoryProvider.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import io.netty.handler.ssl.SslContextBuilder;
2020
import io.r2dbc.postgresql.client.DefaultHostnameVerifier;
2121
import io.r2dbc.postgresql.client.SSLMode;
22+
import io.r2dbc.postgresql.codec.Codecs;
23+
import io.r2dbc.postgresql.codec.Json;
2224
import io.r2dbc.postgresql.util.Assert;
2325
import io.r2dbc.spi.ConnectionFactoryOptions;
2426
import io.r2dbc.spi.ConnectionFactoryProvider;
@@ -86,6 +88,15 @@ public final class PostgresqlConnectionFactoryProvider implements ConnectionFact
8688
*/
8789
public static final String LEGACY_POSTGRESQL_DRIVER = "postgres";
8890

91+
/**
92+
* Configure whether {@link Codecs codecs} should prefer attached data buffers. The default is {@code false}, meaning that codecs will copy data from the input buffer into a {@code byte[]}
93+
* or similar data structure that is enabled for garbage collection. Using attached buffers is more efficient but comes with the requirement that decoded values (such as {@link Json}) must
94+
* be consumed to release attached buffers to avoid memory leaks.
95+
*
96+
* @since 0.8.5
97+
*/
98+
public static final Option<Boolean> PREFER_ATTACHED_BUFFERS = Option.valueOf("preferAttachedBuffers");
99+
89100
/**
90101
* Determine the number of queries that are cached in each connection.
91102
* The default is {@code -1}, meaning there's no limit. The value of {@code 0} disables the cache. Any other value specifies the cache size.
@@ -213,6 +224,7 @@ private static PostgresqlConnectionConfiguration.Builder fromConnectionFactoryOp
213224
mapper.from(OPTIONS).map(PostgresqlConnectionFactoryProvider::convertToMap).to(builder::options);
214225
mapper.from(PASSWORD).to(builder::password);
215226
mapper.from(PORT).map(OptionMapper::toInteger).to(builder::port);
227+
mapper.from(PREFER_ATTACHED_BUFFERS).map(OptionMapper::toBoolean).to(builder::preferAttachedBuffers);
216228
mapper.from(PREPARED_STATEMENT_CACHE_QUERIES).map(OptionMapper::toInteger).to(builder::preparedStatementCacheQueries);
217229
mapper.from(SOCKET).to(builder::socket).otherwise(() -> {
218230
builder.host(options.getRequiredValue(HOST));

src/main/java/io/r2dbc/postgresql/codec/DefaultCodecs.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,29 @@
3131

3232
/**
3333
* The default {@link Codec} implementation. Delegates to type-specific codec implementations.
34+
* <p>Codecs can be configured to prefer or avoid attached buffers for certain data types. Using attached buffers is more memory-efficient as data doesn't need to be copied. In turn, attached
35+
* buffers require release or consumption to avoid memory leaks. By default, codecs don't use attached buffers to minimize the risk of memory leaks.</p>
3436
*/
3537
public final class DefaultCodecs implements Codecs, CodecRegistry {
3638

3739
private final List<Codec<?>> codecs;
3840

3941
/**
40-
* Create a new instance of {@link DefaultCodecs}.
42+
* Create a new instance of {@link DefaultCodecs} preferring detached (copied buffers).
4143
*
4244
* @param byteBufAllocator the {@link ByteBufAllocator} to use for encoding
4345
*/
4446
public DefaultCodecs(ByteBufAllocator byteBufAllocator) {
47+
this(byteBufAllocator, false);
48+
}
49+
50+
/**
51+
* Create a new instance of {@link DefaultCodecs}.
52+
*
53+
* @param byteBufAllocator the {@link ByteBufAllocator} to use for encoding
54+
* @param preferAttachedBuffers whether to prefer attached (pooled) {@link ByteBuf buffers}. Use {@code false} (default) to use detached buffers which minimize the risk of memory leaks.
55+
*/
56+
public DefaultCodecs(ByteBufAllocator byteBufAllocator, boolean preferAttachedBuffers) {
4557
Assert.requireNonNull(byteBufAllocator, "byteBufAllocator must not be null");
4658

4759
this.codecs = new ArrayList<>(Arrays.asList(
@@ -74,7 +86,7 @@ public DefaultCodecs(ByteBufAllocator byteBufAllocator) {
7486
new ZoneIdCodec(byteBufAllocator),
7587

7688
// JSON
77-
new JsonCodec(byteBufAllocator),
89+
new JsonCodec(byteBufAllocator, preferAttachedBuffers),
7890
new JsonByteArrayCodec(byteBufAllocator),
7991
new JsonByteBufCodec(byteBufAllocator),
8092
new JsonByteBufferCodec(byteBufAllocator),

src/main/java/io/r2dbc/postgresql/codec/Json.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ public String asString() {
260260

261261
@Override
262262
public String toString() {
263-
return "JsonByteBufInput{" +
263+
return "JsonByteArrayInput{" +
264264
asString() +
265265
'}';
266266
}
@@ -325,6 +325,13 @@ public String asString() {
325325
return new String(asArray(), StandardCharsets.UTF_8);
326326
}
327327

328+
@Override
329+
public String toString() {
330+
return "JsonInputStreamInput{" +
331+
this.value +
332+
'}';
333+
}
334+
328335
}
329336

330337
/**

src/main/java/io/r2dbc/postgresql/codec/JsonCodec.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,12 @@ final class JsonCodec extends AbstractJsonCodec<Json> {
3737

3838
private final ByteBufAllocator byteBufAllocator;
3939

40-
JsonCodec(ByteBufAllocator byteBufAllocator) {
40+
private final boolean preferAttachedBuffers;
41+
42+
JsonCodec(ByteBufAllocator byteBufAllocator, boolean preferAttachedBuffers) {
4143
super(Json.class);
4244
this.byteBufAllocator = Assert.requireNonNull(byteBufAllocator, "byteBufAllocator must not be null");
45+
this.preferAttachedBuffers = preferAttachedBuffers;
4346
}
4447

4548
@Override
@@ -48,12 +51,15 @@ Json doDecode(ByteBuf buffer, PostgresqlObjectId dataType, Format format, Class<
4851
Assert.requireNonNull(format, "format must not be null");
4952
Assert.requireNonNull(type, "type must not be null");
5053

51-
ByteBuf slice = buffer.retainedSlice();
5254
if (dataType == JSONB && format == FORMAT_BINARY) {
53-
slice.skipBytes(1);
55+
buffer.skipBytes(1);
56+
}
57+
58+
if (this.preferAttachedBuffers) {
59+
return new Json.JsonOutput(buffer.retainedSlice());
5460
}
5561

56-
return new Json.JsonOutput(slice);
62+
return new Json.JsonByteArrayInput(ByteBufUtil.getBytes(buffer));
5763
}
5864

5965
@Override
@@ -68,7 +74,7 @@ Parameter doEncode(Json value) {
6874
Object toEncode;
6975

7076
if (value instanceof Json.JsonInput) {
71-
toEncode = ((Json.JsonInput) value).value;
77+
toEncode = ((Json.JsonInput<?>) value).value;
7278
} else {
7379
toEncode = ((Json.JsonOutput) value).buffer;
7480
((Json.JsonOutput) value).released = true;

src/test/java/io/r2dbc/postgresql/PostgresqlConnectionFactoryProviderUnitTests.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import static io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider.LEGACY_POSTGRESQL_DRIVER;
3232
import static io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider.OPTIONS;
3333
import static io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider.POSTGRESQL_DRIVER;
34+
import static io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider.PREFER_ATTACHED_BUFFERS;
3435
import static io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider.PREPARED_STATEMENT_CACHE_QUERIES;
3536
import static io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider.SOCKET;
3637
import static io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider.SSL_CONTEXT_BUILDER_CUSTOMIZER;
@@ -386,4 +387,18 @@ void shouldParseOptionsProvidedAsMap() {
386387
assertThat(factory.getConfiguration().getOptions().get("default_tablespace")).isEqualTo("unknown");
387388
}
388389

390+
@Test
391+
void shouldConfigurePreferAttachedBuffers() {
392+
393+
PostgresqlConnectionFactory factory = this.provider.create(builder()
394+
.option(DRIVER, POSTGRESQL_DRIVER)
395+
.option(HOST, "test-host")
396+
.option(PASSWORD, "test-password")
397+
.option(USER, "test-user")
398+
.option(PREFER_ATTACHED_BUFFERS, true)
399+
.build());
400+
401+
assertThat(factory.getConfiguration().isPreferAttachedBuffers()).isTrue();
402+
}
403+
389404
}

src/test/java/io/r2dbc/postgresql/codec/JsonCodecUnitTests.java

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,27 +45,49 @@ final class JsonCodecUnitTests {
4545

4646
@Test
4747
void constructorNoByteBufAllocator() {
48-
assertThatIllegalArgumentException().isThrownBy(() -> new JsonCodec(null))
48+
assertThatIllegalArgumentException().isThrownBy(() -> new JsonCodec(null, true))
4949
.withMessage("byteBufAllocator must not be null");
5050
}
5151

5252
@Test
5353
void decode() {
5454
String json = "{\"name\": \"John Doe\"}";
55-
JsonCodec jsonCodec = new JsonCodec(TEST);
55+
JsonCodec jsonCodec = new JsonCodec(TEST, true);
5656
Json decodedBytes = jsonCodec.decode(ByteBufUtils.encode(TEST, json), JSON.getObjectId(), FORMAT_TEXT, Json.class);
5757

5858
assertThat(decodedBytes.asString()).isEqualTo(json);
5959
}
6060

61+
@Test
62+
void shouldDecodeWithAttachedBuffers() {
63+
String json = "{\"name\": \"John Doe\"}";
64+
JsonCodec jsonCodec = new JsonCodec(TEST, true);
65+
ByteBuf encoded = ByteBufUtils.encode(TEST, json);
66+
jsonCodec.decode(encoded, JSON.getObjectId(), FORMAT_TEXT, Json.class);
67+
68+
assertThat(encoded.refCnt()).isEqualTo(2);
69+
encoded.release(2);
70+
}
71+
72+
@Test
73+
void shouldDecodeWithDetachedBuffers() {
74+
String json = "{\"name\": \"John Doe\"}";
75+
JsonCodec jsonCodec = new JsonCodec(TEST, false);
76+
ByteBuf encoded = ByteBufUtils.encode(TEST, json);
77+
jsonCodec.decode(encoded, JSON.getObjectId(), FORMAT_TEXT, Json.class);
78+
79+
assertThat(encoded.refCnt()).isEqualTo(1);
80+
encoded.release();
81+
}
82+
6183
@Test
6284
void decodeNoByteBuf() {
63-
assertThat(new JsonCodec(TEST).decode(null, JSON.getObjectId(), FORMAT_TEXT, Json.class)).isNull();
85+
assertThat(new JsonCodec(TEST, true).decode(null, JSON.getObjectId(), FORMAT_TEXT, Json.class)).isNull();
6486
}
6587

6688
@Test
6789
void doCanDecode() {
68-
JsonCodec jsonCodec = new JsonCodec(TEST);
90+
JsonCodec jsonCodec = new JsonCodec(TEST, true);
6991

7092
assertThat(jsonCodec.doCanDecode(JSON, FORMAT_TEXT)).isTrue();
7193
assertThat(jsonCodec.doCanDecode(JSON, FORMAT_BINARY)).isTrue();
@@ -77,20 +99,20 @@ void doCanDecode() {
7799

78100
@Test
79101
void doCanDecodeNoFormat() {
80-
assertThatIllegalArgumentException().isThrownBy(() -> new JsonCodec(TEST).doCanDecode(JSON, null))
102+
assertThatIllegalArgumentException().isThrownBy(() -> new JsonCodec(TEST, true).doCanDecode(JSON, null))
81103
.withMessage("format must not be null");
82104
}
83105

84106
@Test
85107
void doCanDecodeNoType() {
86-
assertThatIllegalArgumentException().isThrownBy(() -> new JsonCodec(TEST).doCanDecode(null, FORMAT_TEXT))
108+
assertThatIllegalArgumentException().isThrownBy(() -> new JsonCodec(TEST, true).doCanDecode(null, FORMAT_TEXT))
87109
.withMessage("type must not be null");
88110
}
89111

90112
@Test
91113
void doEncode() {
92114
String json = "{\"name\":\"John Doe\"}";
93-
JsonCodec jsonCodec = new JsonCodec(TEST);
115+
JsonCodec jsonCodec = new JsonCodec(TEST, true);
94116

95117
assertThat(jsonCodec.doEncode(Json.of(json)))
96118
.hasFormat(FORMAT_BINARY)
@@ -116,7 +138,7 @@ void doEncode() {
116138
@Test
117139
void doEncodeReleasedByteBuf() {
118140
String json = "{\"name\":\"John Doe\"}";
119-
JsonCodec jsonCodec = new JsonCodec(TEST);
141+
JsonCodec jsonCodec = new JsonCodec(TEST, true);
120142

121143
ByteBuf buffer = TEST.buffer();
122144
buffer.writeCharSequence(json, StandardCharsets.UTF_8);
@@ -130,7 +152,7 @@ void doEncodeReleasedByteBuf() {
130152
@Test
131153
void doEncodeReleasedJsonOutput() {
132154
String json = "{\"name\":\"John Doe\"}";
133-
JsonCodec jsonCodec = new JsonCodec(TEST);
155+
JsonCodec jsonCodec = new JsonCodec(TEST, true);
134156
Json decodedBytes = jsonCodec.decode(ByteBufUtils.encode(TEST, json), JSON.getObjectId(), FORMAT_TEXT, Json.class);
135157

136158
assertThat(jsonCodec.doEncode(decodedBytes))
@@ -141,13 +163,13 @@ void doEncodeReleasedJsonOutput() {
141163

142164
@Test
143165
void doEncodeNoValue() {
144-
assertThatIllegalArgumentException().isThrownBy(() -> new JsonCodec(TEST).doEncode(null))
166+
assertThatIllegalArgumentException().isThrownBy(() -> new JsonCodec(TEST, true).doEncode(null))
145167
.withMessage("value must not be null");
146168
}
147169

148170
@Test
149171
void encodeNull() {
150-
assertThat(new JsonCodec(TEST).encodeNull())
172+
assertThat(new JsonCodec(TEST, true).encodeNull())
151173
.isEqualTo(new Parameter(FORMAT_BINARY, JSONB.getObjectId(), NULL_VALUE));
152174
}
153175

0 commit comments

Comments
 (0)