13
13
import org .apache .lucene .search .TotalHits ;
14
14
import org .elasticsearch .TransportVersion ;
15
15
import org .elasticsearch .TransportVersions ;
16
+ import org .elasticsearch .action .ActionListener ;
17
+ import org .elasticsearch .action .support .SubscribableListener ;
16
18
import org .elasticsearch .common .io .stream .DelayableWriteable ;
17
19
import org .elasticsearch .common .io .stream .StreamInput ;
18
20
import org .elasticsearch .common .io .stream .StreamOutput ;
19
21
import org .elasticsearch .common .lucene .search .TopDocsAndMaxScore ;
20
22
import org .elasticsearch .core .Nullable ;
21
23
import org .elasticsearch .core .RefCounted ;
22
- import org .elasticsearch .core .Releasable ;
23
24
import org .elasticsearch .core .Releasables ;
24
25
import org .elasticsearch .core .SimpleRefCounted ;
25
26
import org .elasticsearch .search .DocValueFormat ;
28
29
import org .elasticsearch .search .SearchShardTarget ;
29
30
import org .elasticsearch .search .aggregations .InternalAggregation ;
30
31
import org .elasticsearch .search .aggregations .InternalAggregations ;
32
+ import org .elasticsearch .search .aggregations .support .AggregationContext ;
31
33
import org .elasticsearch .search .internal .ShardSearchContextId ;
32
34
import org .elasticsearch .search .internal .ShardSearchRequest ;
33
35
import org .elasticsearch .search .profile .SearchProfileDfsPhaseResult ;
37
39
import org .elasticsearch .transport .LeakTracker ;
38
40
39
41
import java .io .IOException ;
40
- import java .util .ArrayList ;
41
- import java .util .List ;
42
42
43
43
import static org .elasticsearch .common .lucene .Lucene .readTopDocs ;
44
44
import static org .elasticsearch .common .lucene .Lucene .writeTopDocs ;
@@ -75,7 +75,7 @@ public final class QuerySearchResult extends SearchPhaseResult {
75
75
76
76
private final RefCounted refCounted ;
77
77
78
- private final List < Releasable > toRelease ;
78
+ private final SubscribableListener < Void > aggsContextReleased ;
79
79
80
80
public QuerySearchResult () {
81
81
this (false );
@@ -99,22 +99,22 @@ public QuerySearchResult(StreamInput in, boolean delayedAggregations) throws IOE
99
99
readFromWithId (id , in , delayedAggregations );
100
100
}
101
101
refCounted = null ;
102
- toRelease = null ;
102
+ aggsContextReleased = null ;
103
103
}
104
104
105
105
public QuerySearchResult (ShardSearchContextId contextId , SearchShardTarget shardTarget , ShardSearchRequest shardSearchRequest ) {
106
106
this .contextId = contextId ;
107
107
setSearchShardTarget (shardTarget );
108
108
isNull = false ;
109
109
setShardSearchRequest (shardSearchRequest );
110
- this .toRelease = new ArrayList <>();
111
110
this .refCounted = LeakTracker .wrap (new SimpleRefCounted ());
111
+ this .aggsContextReleased = new SubscribableListener <>();
112
112
}
113
113
114
114
private QuerySearchResult (boolean isNull ) {
115
115
this .isNull = isNull ;
116
116
this .refCounted = null ;
117
- toRelease = null ;
117
+ aggsContextReleased = null ;
118
118
}
119
119
120
120
/**
@@ -275,16 +275,24 @@ public void releaseAggs() {
275
275
aggregations .close ();
276
276
aggregations = null ;
277
277
}
278
+ releaseAggsContext ();
278
279
}
279
280
280
- public void addReleasable ( Releasable releasable ) {
281
- toRelease . add ( releasable );
281
+ public void addAggregationContext ( AggregationContext aggsContext ) {
282
+ aggsContextReleased . addListener ( ActionListener . releasing ( aggsContext ) );
282
283
}
283
284
284
285
public void aggregations (InternalAggregations aggregations ) {
285
286
assert this .aggregations == null : "aggregations already set to [" + this .aggregations + "]" ;
286
287
this .aggregations = aggregations == null ? null : DelayableWriteable .referencing (aggregations );
287
288
hasAggs = aggregations != null ;
289
+ releaseAggsContext ();
290
+ }
291
+
292
+ private void releaseAggsContext () {
293
+ if (aggsContextReleased != null ) {
294
+ aggsContextReleased .onResponse (null );
295
+ }
288
296
}
289
297
290
298
@ Nullable
@@ -547,7 +555,7 @@ public boolean tryIncRef() {
547
555
public boolean decRef () {
548
556
if (refCounted != null ) {
549
557
if (refCounted .decRef ()) {
550
- Releasables . close ( toRelease );
558
+ aggsContextReleased . onResponse ( null );
551
559
return true ;
552
560
}
553
561
return false ;
0 commit comments