Skip to content

Commit 72a9ccf

Browse files
committed
fixes
Signed-off-by: Kai Huang <[email protected]>
1 parent 540e9de commit 72a9ccf

File tree

1 file changed

+82
-48
lines changed

1 file changed

+82
-48
lines changed

legacy/src/main/java/org/opensearch/sql/legacy/executor/join/ElasticJoinExecutor.java

Lines changed: 82 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.util.HashSet;
1212
import java.util.List;
1313
import java.util.Map;
14+
import java.util.Optional;
1415
import java.util.Set;
1516
import java.util.stream.Stream;
1617
import org.apache.lucene.search.TotalHits;
@@ -96,61 +97,90 @@ public void run() throws IOException, SqlParseException {
9697

9798
LOG.debug("🔍 Starting join execution, checking for JOIN_TIME_OUT hints...");
9899

99-
// ✅ Extract JOIN_TIME_OUT hint and create PIT with custom keepalive
100-
TimeValue customKeepAlive = extractJoinTimeoutFromHints();
100+
// ✅ Create PIT with appropriate timeout
101+
createPointInTimeWithCustomTimeout();
101102

102-
if (customKeepAlive != null) {
103-
LOG.info(
104-
"✅ Using custom PIT keepalive from JOIN_TIME_OUT hint: {} seconds ({}ms)",
105-
customKeepAlive.getSeconds(),
106-
customKeepAlive.getMillis());
107-
pit = new PointInTimeHandlerImpl(client, indices, customKeepAlive);
108-
} else {
109-
LOG.info("⚠️ No JOIN_TIME_OUT hint found, using default PIT keepalive");
110-
pit = new PointInTimeHandlerImpl(client, indices);
111-
}
112-
113-
pit.create();
103+
// Execute the query
114104
results = innerRun();
105+
106+
// Record execution time
115107
long joinTimeInMilli = System.currentTimeMillis() - timeBefore;
116108
this.metaResults.setTookImMilli(joinTimeInMilli);
117-
} catch (Exception e) {
118-
LOG.error("Failed during join query run.", e);
109+
110+
} catch (IOException e) {
111+
LOG.error("IO error during join query execution", e);
112+
throw e;
113+
} catch (SqlParseException e) {
114+
LOG.error("SQL parsing error during join query execution", e);
115+
throw e;
116+
} catch (RuntimeException e) {
117+
LOG.error("Runtime error during join query execution", e);
119118
throw new IllegalStateException("Error occurred during join query run", e);
119+
} catch (Exception e) {
120+
LOG.error("Unexpected error during join query execution", e);
121+
throw new IllegalStateException("Unexpected error occurred during join query run", e);
120122
} finally {
123+
cleanupPointInTime();
124+
}
125+
}
126+
127+
/** Create Point-in-Time with custom timeout if JOIN_TIME_OUT hint is present */
128+
private void createPointInTimeWithCustomTimeout() {
129+
Optional<TimeValue> customKeepAlive = extractJoinTimeoutFromHints();
130+
131+
if (customKeepAlive.isPresent()) {
132+
TimeValue keepAlive = customKeepAlive.get();
133+
LOG.info(
134+
"✅ Using custom PIT keepalive from JOIN_TIME_OUT hint: {} seconds ({}ms)",
135+
keepAlive.getSeconds(),
136+
keepAlive.getMillis());
137+
pit = new PointInTimeHandlerImpl(client, indices, keepAlive);
138+
} else {
139+
LOG.info("⚠️ No JOIN_TIME_OUT hint found, using default PIT keepalive");
140+
pit = new PointInTimeHandlerImpl(client, indices);
141+
}
142+
143+
pit.create();
144+
}
145+
146+
/** Clean up Point-in-Time resources safely */
147+
private void cleanupPointInTime() {
148+
if (pit != null) {
121149
try {
122150
pit.delete();
151+
LOG.debug("Successfully deleted PIT");
123152
} catch (RuntimeException e) {
124153
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
125-
LOG.error("Error deleting point in time {} ", pit);
154+
LOG.error("Error deleting point in time: {}", e.getMessage(), e);
126155
}
127156
}
128157
}
129158

130-
protected TimeValue extractJoinTimeoutFromHints() {
159+
protected Optional<TimeValue> extractJoinTimeoutFromHints() {
131160
try {
132161
LOG.debug(
133162
"Starting hint extraction from request builder: {}",
134163
requestBuilder != null ? requestBuilder.getClass().getSimpleName() : "null");
135164

136165
// Check first table for hints
137-
TimeValue timeout = extractJoinTimeoutFromTable("firstTable", requestBuilder.getFirstTable());
138-
if (timeout != null) {
166+
Optional<TimeValue> timeout =
167+
extractJoinTimeoutFromTable("firstTable", requestBuilder.getFirstTable());
168+
if (timeout.isPresent()) {
139169
return timeout;
140170
}
141171

142172
// Check second table for hints if not found in first
143173
timeout = extractJoinTimeoutFromTable("secondTable", requestBuilder.getSecondTable());
144-
if (timeout != null) {
174+
if (timeout.isPresent()) {
145175
return timeout;
146176
}
147177

148178
LOG.debug("No JOIN_TIME_OUT hint found in either table");
149-
return null;
179+
return Optional.empty();
150180

151181
} catch (Exception e) {
152182
LOG.warn("Error extracting JOIN_TIME_OUT hint, using default keepalive", e);
153-
return null;
183+
return Optional.empty();
154184
}
155185
}
156186

@@ -159,50 +189,54 @@ protected TimeValue extractJoinTimeoutFromHints() {
159189
*
160190
* @param tableName descriptive name for logging (e.g., "firstTable", "secondTable")
161191
* @param table the table request builder to examine
162-
* @return TimeValue if JOIN_TIME_OUT hint found, null otherwise
192+
* @return Optional containing TimeValue if JOIN_TIME_OUT hint found, empty otherwise
163193
*/
164-
private TimeValue extractJoinTimeoutFromTable(String tableName, TableInJoinRequestBuilder table) {
194+
private Optional<TimeValue> extractJoinTimeoutFromTable(
195+
String tableName, TableInJoinRequestBuilder table) {
165196
if (table == null) {
166197
LOG.debug("{} is null", tableName);
167-
return null;
198+
return Optional.empty();
168199
}
169200

170201
Select originalSelect = table.getOriginalSelect();
171202
if (originalSelect == null) {
172203
LOG.debug("{}.getOriginalSelect() is null", tableName);
173-
return null;
204+
return Optional.empty();
174205
}
175206

176207
List<Hint> hints = originalSelect.getHints();
177208
int hintCount = hints != null ? hints.size() : 0;
178209
LOG.debug("{} has {} hints", tableName, hintCount);
179210

180-
if (hints != null && !hints.isEmpty()) {
181-
LOG.debug("Examining hints in {}:", tableName);
182-
for (int i = 0; i < hints.size(); i++) {
183-
Hint hint = hints.get(i);
184-
LOG.debug(
185-
" Hint[{}]: type={}, params={}",
186-
i,
187-
hint.getType(),
188-
hint.getParams() != null ? java.util.Arrays.toString(hint.getParams()) : "null");
189-
190-
// Check if this is the JOIN_TIME_OUT hint we're looking for
191-
if (hint.getType() == HintType.JOIN_TIME_OUT) {
192-
Object[] params = hint.getParams();
193-
if (params != null && params.length > 0) {
194-
Integer timeoutSeconds = (Integer) params[0];
195-
LOG.info("Found JOIN_TIME_OUT hint in {}: {} seconds", tableName, timeoutSeconds);
196-
return TimeValue.timeValueSeconds(timeoutSeconds);
197-
} else {
198-
LOG.warn("JOIN_TIME_OUT hint found in {} but has no parameters", tableName);
199-
}
211+
if (hints == null || hints.isEmpty()) {
212+
LOG.debug("No hints found in {}", tableName);
213+
return Optional.empty();
214+
}
215+
216+
LOG.debug("Examining hints in {}:", tableName);
217+
for (int i = 0; i < hints.size(); i++) {
218+
Hint hint = hints.get(i);
219+
LOG.debug(
220+
" Hint[{}]: type={}, params={}",
221+
i,
222+
hint.getType(),
223+
hint.getParams() != null ? java.util.Arrays.toString(hint.getParams()) : "null");
224+
225+
// Check if this is the JOIN_TIME_OUT hint we're looking for
226+
if (hint.getType() == HintType.JOIN_TIME_OUT) {
227+
Object[] params = hint.getParams();
228+
if (params != null && params.length > 0) {
229+
Integer timeoutSeconds = (Integer) params[0];
230+
LOG.info("Found JOIN_TIME_OUT hint in {}: {} seconds", tableName, timeoutSeconds);
231+
return Optional.of(TimeValue.timeValueSeconds(timeoutSeconds));
232+
} else {
233+
LOG.warn("JOIN_TIME_OUT hint found in {} but has no parameters", tableName);
200234
}
201235
}
202236
}
203237

204238
LOG.debug("No JOIN_TIME_OUT hint found in {}", tableName);
205-
return null;
239+
return Optional.empty();
206240
}
207241

208242
protected abstract List<SearchHit> innerRun() throws IOException, SqlParseException;

0 commit comments

Comments
 (0)