Skip to content

Commit bff0465

Browse files
fix compilation errors
1 parent ce7a6c2 commit bff0465

File tree

6 files changed

+199
-83
lines changed

6 files changed

+199
-83
lines changed

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/Database.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,15 @@ import org.jooq.DSLContext
99
import org.jooq.impl.DSL
1010

1111
/** Database object for interacting with a Jooq connection. */
12-
open class Database(private val dslContext: DSLContext) {
12+
open class Database(private val dslContext: DSLContext?) {
1313
@Throws(SQLException::class)
1414
open fun <T> query(transform: ContextQueryFunction<T>): T? {
1515
return transform.query(dslContext)
1616
}
1717

1818
@Throws(SQLException::class)
1919
open fun <T> transaction(transform: ContextQueryFunction<T>): T? {
20-
return dslContext.transactionResult { configuration: Configuration? ->
20+
return dslContext!!.transactionResult { configuration: Configuration? ->
2121
transform.query(DSL.using(configuration))
2222
}
2323
}

airbyte-cdk/java/airbyte-cdk/datastore-bigquery/build.gradle

+21
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,24 @@
1+
import org.jetbrains.kotlin.gradle.dsl.JvmTarget
2+
import org.jetbrains.kotlin.gradle.dsl.KotlinVersion
3+
4+
compileTestFixturesKotlin {
5+
compilerOptions {
6+
allWarningsAsErrors = false
7+
}
8+
}
9+
10+
compileTestKotlin {
11+
compilerOptions {
12+
allWarningsAsErrors = false
13+
}
14+
}
15+
16+
compileKotlin {
17+
compilerOptions {
18+
allWarningsAsErrors = false
19+
}
20+
}
21+
122
dependencies {
223
implementation project(':airbyte-cdk:java:airbyte-cdk:dependencies')
324
implementation project(':airbyte-cdk:java:airbyte-cdk:core')
+89-45
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,26 @@ import com.google.common.base.Charsets
1111
import com.google.common.collect.ImmutableMap
1212
import com.google.common.collect.Streams
1313
import io.airbyte.cdk.db.SqlDatabase
14-
import org.apache.commons.lang3.StringUtils
15-
import org.apache.commons.lang3.tuple.ImmutablePair
16-
import org.slf4j.Logger
17-
import org.slf4j.LoggerFactory
18-
import org.threeten.bp.Duration
1914
import java.io.ByteArrayInputStream
2015
import java.io.IOException
2116
import java.sql.SQLException
2217
import java.util.*
2318
import java.util.function.Consumer
2419
import java.util.stream.Collectors
2520
import java.util.stream.Stream
21+
import org.apache.commons.lang3.StringUtils
22+
import org.apache.commons.lang3.tuple.ImmutablePair
23+
import org.slf4j.Logger
24+
import org.slf4j.LoggerFactory
25+
import org.threeten.bp.Duration
2626

27-
class BigQueryDatabase @JvmOverloads constructor(projectId: String?, jsonCreds: String?, sourceOperations: BigQuerySourceOperations? = BigQuerySourceOperations()) : SqlDatabase() {
27+
class BigQueryDatabase
28+
@JvmOverloads
29+
constructor(
30+
projectId: String?,
31+
jsonCreds: String?,
32+
sourceOperations: BigQuerySourceOperations? = BigQuerySourceOperations()
33+
) : SqlDatabase() {
2834
var bigQuery: BigQuery? = null
2935
private var sourceOperations: BigQuerySourceOperations? = null
3036

@@ -34,19 +40,28 @@ class BigQueryDatabase @JvmOverloads constructor(projectId: String?, jsonCreds:
3440
val bigQueryBuilder = BigQueryOptions.newBuilder()
3541
var credentials: ServiceAccountCredentials? = null
3642
if (jsonCreds != null && !jsonCreds.isEmpty()) {
37-
credentials = ServiceAccountCredentials
38-
.fromStream(ByteArrayInputStream(jsonCreds.toByteArray(Charsets.UTF_8)))
43+
credentials =
44+
ServiceAccountCredentials.fromStream(
45+
ByteArrayInputStream(jsonCreds.toByteArray(Charsets.UTF_8))
46+
)
3947
}
40-
bigQuery = bigQueryBuilder
48+
bigQuery =
49+
bigQueryBuilder
4150
.setProjectId(projectId)
42-
.setCredentials(if (!Objects.isNull(credentials)) credentials else ServiceAccountCredentials.getApplicationDefault())
43-
.setHeaderProvider { ImmutableMap.of("user-agent", getUserAgentHeader(connectorVersion)) }
44-
.setRetrySettings(RetrySettings
45-
.newBuilder()
51+
.setCredentials(
52+
if (!Objects.isNull(credentials)) credentials
53+
else ServiceAccountCredentials.getApplicationDefault()
54+
)
55+
.setHeaderProvider {
56+
ImmutableMap.of("user-agent", getUserAgentHeader(connectorVersion))
57+
}
58+
.setRetrySettings(
59+
RetrySettings.newBuilder()
4660
.setMaxAttempts(10)
4761
.setRetryDelayMultiplier(1.5)
4862
.setTotalTimeout(Duration.ofMinutes(60))
49-
.build())
63+
.build()
64+
)
5065
.build()
5166
.service
5267
} catch (e: IOException) {
@@ -59,31 +74,41 @@ class BigQueryDatabase @JvmOverloads constructor(projectId: String?, jsonCreds:
5974
}
6075

6176
private val connectorVersion: String
62-
get() = Optional.ofNullable(System.getenv("WORKER_CONNECTOR_IMAGE"))
77+
get() =
78+
Optional.ofNullable(System.getenv("WORKER_CONNECTOR_IMAGE"))
6379
.orElse(StringUtils.EMPTY)
64-
.replace("airbyte/", StringUtils.EMPTY).replace(":", "/")
80+
.replace("airbyte/", StringUtils.EMPTY)
81+
.replace(":", "/")
6582

6683
@Throws(SQLException::class)
6784
override fun execute(sql: String?) {
6885
val result = executeQuery(bigQuery, getQueryConfig(sql, emptyList()))
6986
if (result.getLeft() == null) {
70-
throw SQLException("BigQuery request is failed with error: " + result.getRight() + ". SQL: " + sql)
87+
throw SQLException(
88+
"BigQuery request is failed with error: " + result.getRight() + ". SQL: " + sql
89+
)
7190
}
7291
LOGGER.info("BigQuery successfully finished execution SQL: $sql")
7392
}
7493

7594
@Throws(Exception::class)
76-
fun query(sql: String?, vararg params: QueryParameterValue?): Stream<JsonNode> {
77-
return query(sql, (if (params == null) emptyList() else Arrays.asList(*params)))
95+
fun query(sql: String?, vararg params: QueryParameterValue): Stream<JsonNode> {
96+
return query(sql, (if (params == null) emptyList() else Arrays.asList(*params).toList()))
7897
}
7998

8099
@Throws(Exception::class)
81-
override fun unsafeQuery(sql: String?, vararg params: String): Stream<JsonNode> {
82-
val parameterValueList = if (params == null) emptyList()
83-
else Arrays.stream(params).map { param: String? ->
84-
QueryParameterValue.newBuilder().setValue(param).setType(
85-
StandardSQLTypeName.STRING).build()
86-
}.collect(Collectors.toList())
100+
override fun unsafeQuery(sql: String?, vararg params: String?): Stream<JsonNode> {
101+
val parameterValueList =
102+
if (params == null) emptyList()
103+
else
104+
Arrays.stream(params)
105+
.map { param: String? ->
106+
QueryParameterValue.newBuilder()
107+
.setValue(param)
108+
.setType(StandardSQLTypeName.STRING)
109+
.build()
110+
}
111+
.collect(Collectors.toList())
87112

88113
return query(sql, parameterValueList)
89114
}
@@ -95,21 +120,31 @@ class BigQueryDatabase @JvmOverloads constructor(projectId: String?, jsonCreds:
95120

96121
if (result.getLeft() != null) {
97122
val fieldList = result.getLeft()!!.getQueryResults().schema.fields
98-
return Streams.stream(result.getLeft()!!.getQueryResults().iterateAll())
99-
.map { fieldValues: FieldValueList -> sourceOperations!!.rowToJson(BigQueryResultSet(fieldValues, fieldList)) }
100-
} else throw Exception(
101-
"Failed to execute query " + sql + (if (params != null && !params.isEmpty()) " with params $params" else "") + ". Error: " + result.getRight())
123+
return Streams.stream(result.getLeft()!!.getQueryResults().iterateAll()).map {
124+
fieldValues: FieldValueList ->
125+
sourceOperations!!.rowToJson(BigQueryResultSet(fieldValues, fieldList))
126+
}
127+
} else
128+
throw Exception(
129+
"Failed to execute query " +
130+
sql +
131+
(if (params != null && !params.isEmpty()) " with params $params" else "") +
132+
". Error: " +
133+
result.getRight()
134+
)
102135
}
103136

104137
fun getQueryConfig(sql: String?, params: List<QueryParameterValue>?): QueryJobConfiguration {
105-
return QueryJobConfiguration
106-
.newBuilder(sql)
107-
.setUseLegacySql(false)
108-
.setPositionalParameters(params)
109-
.build()
138+
return QueryJobConfiguration.newBuilder(sql)
139+
.setUseLegacySql(false)
140+
.setPositionalParameters(params)
141+
.build()
110142
}
111143

112-
fun executeQuery(bigquery: BigQuery?, queryConfig: QueryJobConfiguration?): ImmutablePair<Job?, String?> {
144+
fun executeQuery(
145+
bigquery: BigQuery?,
146+
queryConfig: QueryJobConfiguration?
147+
): ImmutablePair<Job?, String?> {
113148
val jobId = JobId.of(UUID.randomUUID().toString())
114149
val queryJob = bigquery!!.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build())
115150
return executeQuery(queryJob)
@@ -123,13 +158,21 @@ class BigQueryDatabase @JvmOverloads constructor(projectId: String?, jsonCreds:
123158
*/
124159
fun getProjectTables(projectId: String?): List<Table> {
125160
val tableList: MutableList<Table> = ArrayList()
126-
bigQuery!!.listDatasets(projectId)
127-
.iterateAll()
128-
.forEach(Consumer { dataset: Dataset ->
129-
bigQuery!!.listTables(dataset.datasetId)
130-
.iterateAll()
131-
.forEach(Consumer { table: Table -> tableList.add(bigQuery!!.getTable(table.tableId)) })
132-
})
161+
bigQuery!!
162+
.listDatasets(projectId)
163+
.iterateAll()
164+
.forEach(
165+
Consumer { dataset: Dataset ->
166+
bigQuery!!
167+
.listTables(dataset.datasetId)
168+
.iterateAll()
169+
.forEach(
170+
Consumer { table: Table ->
171+
tableList.add(bigQuery!!.getTable(table.tableId))
172+
}
173+
)
174+
}
175+
)
133176
return tableList
134177
}
135178

@@ -141,9 +184,10 @@ class BigQueryDatabase @JvmOverloads constructor(projectId: String?, jsonCreds:
141184
*/
142185
fun getDatasetTables(datasetId: String?): List<Table> {
143186
val tableList: MutableList<Table> = ArrayList()
144-
bigQuery!!.listTables(datasetId)
145-
.iterateAll()
146-
.forEach(Consumer { table: Table -> tableList.add(bigQuery!!.getTable(table.tableId)) })
187+
bigQuery!!
188+
.listTables(datasetId)
189+
.iterateAll()
190+
.forEach(Consumer { table: Table -> tableList.add(bigQuery!!.getTable(table.tableId)) })
147191
return tableList
148192
}
149193

0 commit comments

Comments
 (0)