Skip to content

Commit 5c7b40a

Browse files
authored
Merge pull request #1667 from wooEnrico/main
fix response close for getRowsUpdated(#1538)
2 parents 548525b + b2a3ac4 commit 5c7b40a

File tree

2 files changed

+23
-24
lines changed

2 files changed

+23
-24
lines changed

clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseResult.java

+11-12
Original file line numberDiff line numberDiff line change
@@ -20,28 +20,24 @@ public class ClickHouseResult implements Result {
2020

2121
private static final Logger log = LoggerFactory.getLogger(ClickHouseResult.class);
2222

23-
private final Flux<? extends Result.Segment> rowSegments;
24-
private final Mono<? extends Result.Segment> updatedCount;
2523
private final Flux<? extends Result.Segment> segments;
2624

2725
ClickHouseResult(ClickHouseResponse response) {
28-
this.rowSegments = Mono.just(response)
26+
Flux<? extends RowSegment> rowSegments = Mono.just(response)
2927
.flatMapMany(resp -> Flux
3028
.fromStream(StreamSupport.stream(resp.records().spliterator(), false)
3129
.map(rec -> ClickHousePair.of(resp.getColumns(), rec))))
3230
.map(pair -> new ClickHouseRow(pair.getRight(), pair.getLeft()))
3331
.map(RowSegment::new);
34-
this.updatedCount = Mono.just(response).map(ClickHouseResponse::getSummary)
32+
Mono<? extends UpdateCount> updatedCount = Mono.just(response).map(ClickHouseResponse::getSummary)
3533
.map(ClickHouseResponseSummary::getProgress)
3634
.map(ClickHouseResponseSummary.Progress::getWrittenRows)
3735
.map(UpdateCount::new);
38-
this.segments = Flux.concat(this.updatedCount, this.rowSegments);
36+
this.segments = Flux.concat(updatedCount, rowSegments).doOnComplete(response::close);
3937
}
4038

41-
ClickHouseResult(Flux<? extends Result.Segment> rowSegments, Mono<? extends Result.Segment> updatedCount) {
42-
this.rowSegments = rowSegments;
43-
this.updatedCount = updatedCount;
44-
this.segments = Flux.concat(this.updatedCount, this.rowSegments);
39+
ClickHouseResult(Flux<? extends Result.Segment> rowSegments) {
40+
this.segments = rowSegments;
4541
}
4642

4743
/**
@@ -51,12 +47,15 @@ public class ClickHouseResult implements Result {
5147
*/
5248
@Override
5349
public Mono<Long> getRowsUpdated() {
54-
return updatedCount.map(val -> ((UpdateCount) val).value());
50+
return this.segments.filter(segment -> segment instanceof UpdateCount)
51+
.cast(UpdateCount.class)
52+
.map(UpdateCount::value)
53+
.reduce(Long::sum);
5554
}
5655

5756
@Override
5857
public <T> Publisher<T> map(BiFunction<Row, RowMetadata, ? extends T> biFunction) {
59-
return rowSegments.cast(RowSegment.class)
58+
return this.segments.filter(segment -> segment instanceof RowSegment).cast(RowSegment.class)
6059
.map(RowSegment::row).handle((row, sink) -> {
6160
try {
6261
sink.next(biFunction.apply(row, row.getMetadata()));
@@ -68,7 +67,7 @@ public <T> Publisher<T> map(BiFunction<Row, RowMetadata, ? extends T> biFunction
6867

6968
@Override
7069
public Result filter(Predicate<Segment> predicate) {
71-
return new ClickHouseResult(segments.filter(predicate), updatedCount.filter(predicate));
70+
return new ClickHouseResult(segments.filter(predicate));
7271
}
7372

7473
@Override

clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseResult091.java

+12-12
Original file line numberDiff line numberDiff line change
@@ -20,28 +20,24 @@ class ClickHouseResult implements Result {
2020

2121
private static final Logger log = LoggerFactory.getLogger(ClickHouseResult.class);
2222

23-
private final Flux<? extends Result.Segment> rowSegments;
24-
private final Mono<? extends Result.Segment> updatedCount;
2523
private final Flux<? extends Result.Segment> segments;
2624

2725
ClickHouseResult(ClickHouseResponse response) {
28-
this.rowSegments = Mono.just(response)
26+
Flux<? extends RowSegment> rowSegments = Mono.just(response)
2927
.flatMapMany(resp -> Flux
3028
.fromStream(StreamSupport.stream(resp.records().spliterator(), false)
3129
.map(rec -> ClickHousePair.of(resp.getColumns(), rec))))
3230
.map(pair -> new ClickHouseRow(pair.getRight(), pair.getLeft()))
3331
.map(RowSegment::new);
34-
this.updatedCount = Mono.just(response).map(ClickHouseResponse::getSummary)
32+
Mono<? extends UpdateCount> updatedCount = Mono.just(response).map(ClickHouseResponse::getSummary)
3533
.map(ClickHouseResponseSummary::getProgress)
3634
.map(ClickHouseResponseSummary.Progress::getWrittenRows)
3735
.map(UpdateCount::new);
38-
this.segments = Flux.concat(this.updatedCount, this.rowSegments);
36+
this.segments = Flux.concat(updatedCount, rowSegments).doOnComplete(response::close);
3937
}
4038

41-
ClickHouseResult(Flux<? extends Result.Segment> rowSegments, Mono<? extends Result.Segment> updatedCount) {
42-
this.rowSegments = rowSegments;
43-
this.updatedCount = updatedCount;
44-
this.segments = Flux.concat(this.updatedCount, this.rowSegments);
39+
ClickHouseResult(Flux<? extends Result.Segment> rowSegments) {
40+
this.segments = rowSegments;
4541
}
4642

4743
/**
@@ -51,12 +47,16 @@ class ClickHouseResult implements Result {
5147
*/
5248
@Override
5349
public Mono<Integer> getRowsUpdated() {
54-
return updatedCount.map(val -> (int) ((UpdateCount) val).value());
50+
return this.segments.filter(segment -> segment instanceof UpdateCount)
51+
.cast(UpdateCount.class)
52+
.map(UpdateCount::value)
53+
.reduce(Long::sum)
54+
.map(Math::toIntExact);
5555
}
5656

5757
@Override
5858
public <T> Publisher<T> map(BiFunction<Row, RowMetadata, ? extends T> biFunction) {
59-
return rowSegments.cast(RowSegment.class)
59+
return this.segments.filter(segment -> segment instanceof RowSegment).cast(RowSegment.class)
6060
.map(RowSegment::row).handle((row, sink) -> {
6161
try {
6262
sink.next(biFunction.apply(row, row.getMetadata()));
@@ -68,7 +68,7 @@ public <T> Publisher<T> map(BiFunction<Row, RowMetadata, ? extends T> biFunction
6868

6969
@Override
7070
public Result filter(Predicate<Segment> predicate) {
71-
return new ClickHouseResult(segments.filter(predicate), updatedCount.filter(predicate));
71+
return new ClickHouseResult(segments.filter(predicate));
7272
}
7373

7474
@Override

0 commit comments

Comments
 (0)