Skip to content

Commit 13d4fb4

Browse files
committed
[Improve] implicitUtil improvement
1 parent 203beb8 commit 13d4fb4

File tree

81 files changed

+54
-5444
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

81 files changed

+54
-5444
lines changed

streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ object FileUtils {
5151
if (input == null) {
5252
throw new RuntimeException("The inputStream can not be null")
5353
}
54-
input.autoClose(in => {
54+
input.using(in => {
5555
val b = new Array[Byte](4)
5656
in.read(b, 0, b.length)
5757
bytesToHexString(b)
@@ -174,7 +174,7 @@ object FileUtils {
174174

175175
@throws[IOException]
176176
def readInputStream(in: InputStream, array: Array[Byte]): Unit = {
177-
in.autoClose(is => {
177+
in.using(is => {
178178
var toRead = array.length
179179
var ret = 0
180180
var off = 0
@@ -196,7 +196,7 @@ object FileUtils {
196196
val array = new Array[Byte](len.toInt)
197197
Files
198198
.newInputStream(file.toPath)
199-
.autoClose(is => {
199+
.using(is => {
200200
readInputStream(is, array)
201201
new String(array, StandardCharsets.UTF_8)
202202
})
@@ -215,7 +215,7 @@ object FileUtils {
215215
@throws[IOException]
216216
def readEndOfFile(file: File, maxSize: Long): Array[Byte] = {
217217
var readSize = maxSize
218-
new RandomAccessFile(file, "r").autoClose(raFile => {
218+
new RandomAccessFile(file, "r").using(raFile => {
219219
if (raFile.length > maxSize) {
220220
raFile.seek(raFile.length - maxSize)
221221
} else if (raFile.length < maxSize) {
@@ -249,7 +249,7 @@ object FileUtils {
249249
throw new IllegalArgumentException(
250250
s"The startOffset $startOffset is great than the file length ${file.length}")
251251
}
252-
new RandomAccessFile(file, "r").autoClose(raFile => {
252+
new RandomAccessFile(file, "r").using(raFile => {
253253
val readSize = Math.min(maxSize, file.length - startOffset)
254254
raFile.seek(startOffset)
255255
val fileContent = new Array[Byte](readSize.toInt)
@@ -275,7 +275,7 @@ object FileUtils {
275275
if (file.exists && file.isFile) {
276276
Files
277277
.lines(Paths.get(path))
278-
.autoClose(stream =>
278+
.using(stream =>
279279
stream
280280
.skip(offset)
281281
.limit(limit)

streampark-common/src/main/scala/org/apache/streampark/common/util/Implicits.scala

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,11 @@
1717

1818
package org.apache.streampark.common.util
1919

20-
import org.apache.streampark.common.util.Utils.close
21-
2220
import java.lang.{Boolean => JavaBool, Double => JavaDouble, Float => JavaFloat, Integer => JavaInt, Long => JavaLong, Short => JavaShort}
2321

2422
import scala.collection.convert.{DecorateAsJava, DecorateAsScala, ToJavaImplicits, ToScalaImplicits}
2523
import scala.language.implicitConversions
24+
import scala.util.Try
2625

2726
object Implicits extends ToScalaImplicits with ToJavaImplicits with DecorateAsJava with DecorateAsScala {
2827

@@ -55,17 +54,25 @@ object Implicits extends ToScalaImplicits with ToJavaImplicits with DecorateAsJa
5554
type JavaShort = java.lang.Short
5655

5756
implicit class AutoCloseImplicits[T <: AutoCloseable](autoCloseable: T) {
58-
59-
implicit def autoClose[R](func: T => R)(implicit excFunc: Throwable => R = null): R = {
57+
implicit def using[R](func: T => R)(implicit excFunc: Throwable => R = null): R = {
58+
var exception: Option[Throwable] = null
6059
try {
6160
func(autoCloseable)
6261
} catch {
63-
case e: Throwable if excFunc != null => excFunc(e)
62+
case e: Throwable =>
63+
exception = Some(e)
64+
if (excFunc != null) {
65+
excFunc(e)
66+
} else {
67+
throw e
68+
}
6469
} finally {
65-
close(autoCloseable)
70+
Try(autoCloseable.close()).recover { case e =>
71+
exception.foreach(originalEx => e.addSuppressed(originalEx))
72+
throw e
73+
}
6674
}
6775
}
68-
6976
}
7077

7178
implicit class StringImplicits(v: String) {

streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.streampark.common.util
1919

2020
import org.apache.streampark.common.util.Implicits._
21-
import org.apache.streampark.common.util.Implicits.AutoCloseImplicits
2221

2322
import org.apache.commons.lang3.StringUtils
2423

@@ -75,7 +74,7 @@ object Utils extends Logger {
7574
def getJarManifest(jarFile: File): jar.Manifest = {
7675
requireCheckJarFile(jarFile.toURL)
7776
new JarInputStream(new BufferedInputStream(new FileInputStream(jarFile)))
78-
.autoClose(_.getManifest)
77+
.using(_.getManifest)
7978
}
8079

8180
def getJarManClass(jarFile: File): String = {

streampark-common/src/test/scala/org/apache/streampark/common/util/ImplicitsTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.scalatest.funsuite.AnyFunSuite
2323

2424
class ImplicitsTest extends AnyFunSuite {
2525
test(
26-
"AutoCloseImplicits.autoClose should close the resource after execution and handle exceptions") {
26+
"AutoCloseImplicits.using should close the resource after execution and handle exceptions") {
2727
class MockResource extends AutoCloseable {
2828
var closed = false
2929
def close(): Unit = closed = true
@@ -35,7 +35,7 @@ class ImplicitsTest extends AnyFunSuite {
3535

3636
val mockResource = new MockResource
3737
assertThrows[RuntimeException] {
38-
mockResource.autoClose(operation)
38+
mockResource.using(operation)
3939
}
4040
assert(mockResource.closed)
4141
}

streampark-console/streampark-console-service/pom.xml

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -479,18 +479,6 @@
479479
<version>${testcontainer.version}</version>
480480
<scope>test</scope>
481481
</dependency>
482-
<dependency>
483-
<groupId>org.apache.streampark</groupId>
484-
<artifactId>streampark-flink-connector-plugin</artifactId>
485-
<version>2.2.0-SNAPSHOT</version>
486-
<scope>test</scope>
487-
<exclusions>
488-
<exclusion>
489-
<groupId>org.xerial.snappy</groupId>
490-
<artifactId>snappy-java</artifactId>
491-
</exclusion>
492-
</exclusions>
493-
</dependency>
494482
<dependency>
495483
<groupId>org.apache.flink</groupId>
496484
<artifactId>flink-kubernetes</artifactId>
@@ -612,12 +600,6 @@
612600
<version>${project.version}</version>
613601
<outputDirectory>${project.build.directory}/lib</outputDirectory>
614602
</dependency>
615-
<dependency>
616-
<groupId>org.apache.streampark</groupId>
617-
<artifactId>streampark-flink-connector-plugin</artifactId>
618-
<version>${project.version}</version>
619-
<outputDirectory>${project.build.directory}/plugins</outputDirectory>
620-
</dependency>
621603
</artifactItems>
622604
</configuration>
623605
<executions>

streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/DatabaseController.java

Lines changed: 0 additions & 67 deletions
This file was deleted.

streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkCatalogController.java

Lines changed: 0 additions & 92 deletions
This file was deleted.

0 commit comments

Comments
 (0)