Skip to content

Commit 1c08148

Browse files
authored
Adds support for thread interruption in compilation and execution (#1211)
* Adds support for thread interruption in compilation and execution * Adds support for CLI users to use CTRL-C
1 parent 36aeaa0 commit 1c08148

File tree

11 files changed

+838
-122
lines changed

11 files changed

+838
-122
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ Thank you to all who have contributed!
2929
## [Unreleased]
3030

3131
### Added
32+
- Adds `isInterruptible` property to `CompileOptions`. The default value is `false`. Please see the KDocs for more information.
33+
- Adds support for thread interruption in compilation and execution. If you'd like to opt-in to this addition, please see
34+
the `isInterruptible` addition above for more information.
35+
- Adds support for CLI users to use CTRL-C to cancel long-running compilation/execution of queries
3236

3337
### Changed
3438

partiql-cli/src/main/kotlin/org/partiql/cli/pipeline/AbstractPipeline.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ internal sealed class AbstractPipeline(open val options: PipelineOptions) {
145145
projectionIteration(options.projectionIterationBehavior)
146146
undefinedVariable(options.undefinedVariableBehavior)
147147
typingMode(options.typingMode)
148+
isInterruptible(true)
148149
}
149150

150151
private val compilerPipeline = CompilerPipeline.build {
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at:
7+
*
8+
* http://aws.amazon.com/apache2.0/
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific
12+
* language governing permissions and limitations under the License.
13+
*/
14+
15+
package org.partiql.cli.shell
16+
17+
import org.partiql.cli.pipeline.AbstractPipeline
18+
import org.partiql.lang.eval.EvaluationSession
19+
import org.partiql.lang.eval.PartiQLResult
20+
import java.util.concurrent.BlockingQueue
21+
import java.util.concurrent.TimeUnit
22+
import java.util.concurrent.atomic.AtomicBoolean
23+
24+
/**
25+
* A wrapper over [AbstractPipeline]. It constantly grabs input queries from [inputs] and places the [PartiQLResult]
26+
* in [results]. When it is done compiling a single statement, it sets [doneCompiling] to true.
27+
*/
28+
internal class RunnablePipeline(
29+
private val inputs: BlockingQueue<Input>,
30+
private val results: BlockingQueue<PartiQLResult>,
31+
val pipeline: AbstractPipeline,
32+
private val doneCompiling: AtomicBoolean
33+
) : Runnable {
34+
/**
35+
* When the Thread running this [Runnable] is interrupted, the underlying [AbstractPipeline] should catch the
36+
* interruption and fail with some exception. Then, this will break out of [run].
37+
*/
38+
override fun run() {
39+
while (true) {
40+
val input = inputs.poll(3, TimeUnit.SECONDS)
41+
if (input != null) {
42+
val result = pipeline.compile(input.input, input.session)
43+
results.put(result)
44+
doneCompiling.set(true)
45+
}
46+
}
47+
}
48+
49+
/**
50+
* Represents a PartiQL statement ([input]) and the [EvaluationSession] to evaluate with.
51+
*/
52+
internal data class Input(
53+
val input: String,
54+
val session: EvaluationSession
55+
)
56+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at:
7+
*
8+
* http://aws.amazon.com/apache2.0/
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific
12+
* language governing permissions and limitations under the License.
13+
*/
14+
15+
package org.partiql.cli.shell
16+
17+
import org.partiql.lang.eval.ExprValue
18+
import org.partiql.lang.util.ExprValueFormatter
19+
import java.io.PrintStream
20+
import java.util.concurrent.BlockingQueue
21+
import java.util.concurrent.TimeUnit
22+
import java.util.concurrent.atomic.AtomicBoolean
23+
24+
/**
25+
* A wrapper over [ExprValueFormatter]. It constantly grabs [ExprValue]s from [values] and writes them to [out].
26+
* When it is done printing a single item, it sets [donePrinting] to true.
27+
*/
28+
internal class RunnableWriter(
29+
private val out: PrintStream,
30+
private val formatter: ExprValueFormatter,
31+
private val values: BlockingQueue<ExprValue>,
32+
private val donePrinting: AtomicBoolean
33+
) : Runnable {
34+
35+
/**
36+
* When the Thread running this [Runnable] is interrupted, the underlying formatter should check the interruption
37+
* flag and fail with some exception. The formatter itself doesn't do this, but, since [ExprValue]s are lazily created,
38+
* the creation of the [ExprValue] (by means of the thunks produced by the EvaluatingCompiler) should throw an exception
39+
* when the thread is interrupted. Then, this will break out of [run].
40+
*/
41+
override fun run() {
42+
while (true) {
43+
val value = values.poll(3, TimeUnit.SECONDS)
44+
if (value != null) {
45+
out.info(BAR_1)
46+
formatter.formatTo(value, out)
47+
out.println()
48+
out.info(BAR_2)
49+
donePrinting.set(true)
50+
}
51+
}
52+
}
53+
}

0 commit comments

Comments
 (0)