@@ -11,21 +11,27 @@ import com.google.common.base.Charsets
11
11
import com.google.common.collect.ImmutableMap
12
12
import com.google.common.collect.Streams
13
13
import io.airbyte.cdk.db.SqlDatabase
14
- import org.apache.commons.lang3.StringUtils
15
- import org.apache.commons.lang3.tuple.ImmutablePair
16
- import org.slf4j.Logger
17
- import org.slf4j.LoggerFactory
18
- import org.threeten.bp.Duration
19
14
import java.io.ByteArrayInputStream
20
15
import java.io.IOException
21
16
import java.sql.SQLException
22
17
import java.util.*
23
18
import java.util.function.Consumer
24
19
import java.util.stream.Collectors
25
20
import java.util.stream.Stream
21
+ import org.apache.commons.lang3.StringUtils
22
+ import org.apache.commons.lang3.tuple.ImmutablePair
23
+ import org.slf4j.Logger
24
+ import org.slf4j.LoggerFactory
25
+ import org.threeten.bp.Duration
26
26
27
- class BigQueryDatabase @JvmOverloads constructor(projectId : String? , jsonCreds : String? , sourceOperations : BigQuerySourceOperations ? = BigQuerySourceOperations ()) : SqlDatabase() {
28
- var bigQuery: BigQuery ? = null
27
+ class BigQueryDatabase
28
+ @JvmOverloads
29
+ constructor (
30
+ projectId: String? ,
31
+ jsonCreds: String? ,
32
+ sourceOperations: BigQuerySourceOperations ? = BigQuerySourceOperations ()
33
+ ) : SqlDatabase () {
34
+ var bigQuery: BigQuery
29
35
private var sourceOperations: BigQuerySourceOperations ? = null
30
36
31
37
init {
@@ -34,19 +40,28 @@ class BigQueryDatabase @JvmOverloads constructor(projectId: String?, jsonCreds:
34
40
val bigQueryBuilder = BigQueryOptions .newBuilder()
35
41
var credentials: ServiceAccountCredentials ? = null
36
42
if (jsonCreds != null && ! jsonCreds.isEmpty()) {
37
- credentials = ServiceAccountCredentials
38
- .fromStream(ByteArrayInputStream (jsonCreds.toByteArray(Charsets .UTF_8 )))
43
+ credentials =
44
+ ServiceAccountCredentials .fromStream(
45
+ ByteArrayInputStream (jsonCreds.toByteArray(Charsets .UTF_8 ))
46
+ )
39
47
}
40
- bigQuery = bigQueryBuilder
48
+ bigQuery =
49
+ bigQueryBuilder
41
50
.setProjectId(projectId)
42
- .setCredentials(if (! Objects .isNull(credentials)) credentials else ServiceAccountCredentials .getApplicationDefault())
43
- .setHeaderProvider { ImmutableMap .of(" user-agent" , getUserAgentHeader(connectorVersion)) }
44
- .setRetrySettings(RetrySettings
45
- .newBuilder()
51
+ .setCredentials(
52
+ if (! Objects .isNull(credentials)) credentials
53
+ else ServiceAccountCredentials .getApplicationDefault()
54
+ )
55
+ .setHeaderProvider {
56
+ ImmutableMap .of(" user-agent" , getUserAgentHeader(connectorVersion))
57
+ }
58
+ .setRetrySettings(
59
+ RetrySettings .newBuilder()
46
60
.setMaxAttempts(10 )
47
61
.setRetryDelayMultiplier(1.5 )
48
62
.setTotalTimeout(Duration .ofMinutes(60 ))
49
- .build())
63
+ .build()
64
+ )
50
65
.build()
51
66
.service
52
67
} catch (e: IOException ) {
@@ -59,31 +74,41 @@ class BigQueryDatabase @JvmOverloads constructor(projectId: String?, jsonCreds:
59
74
}
60
75
61
76
private val connectorVersion: String
62
- get() = Optional .ofNullable(System .getenv(" WORKER_CONNECTOR_IMAGE" ))
77
+ get() =
78
+ Optional .ofNullable(System .getenv(" WORKER_CONNECTOR_IMAGE" ))
63
79
.orElse(StringUtils .EMPTY )
64
- .replace(" airbyte/" , StringUtils .EMPTY ).replace(" :" , " /" )
80
+ .replace(" airbyte/" , StringUtils .EMPTY )
81
+ .replace(" :" , " /" )
65
82
66
83
@Throws(SQLException ::class )
67
84
override fun execute (sql : String? ) {
68
85
val result = executeQuery(bigQuery, getQueryConfig(sql, emptyList()))
69
86
if (result.getLeft() == null ) {
70
- throw SQLException (" BigQuery request is failed with error: " + result.getRight() + " . SQL: " + sql)
87
+ throw SQLException (
88
+ " BigQuery request is failed with error: " + result.getRight() + " . SQL: " + sql
89
+ )
71
90
}
72
91
LOGGER .info(" BigQuery successfully finished execution SQL: $sql " )
73
92
}
74
93
75
94
@Throws(Exception ::class )
76
- fun query (sql : String? , vararg params : QueryParameterValue ? ): Stream <JsonNode > {
77
- return query(sql, (if (params == null ) emptyList() else Arrays .asList(* params)))
95
+ fun query (sql : String? , vararg params : QueryParameterValue ): Stream <JsonNode > {
96
+ return query(sql, (if (params == null ) emptyList() else Arrays .asList(* params).toList() ))
78
97
}
79
98
80
99
@Throws(Exception ::class )
81
- override fun unsafeQuery (sql : String? , vararg params : String ): Stream <JsonNode > {
82
- val parameterValueList = if (params == null ) emptyList()
83
- else Arrays .stream(params).map { param: String? ->
84
- QueryParameterValue .newBuilder().setValue(param).setType(
85
- StandardSQLTypeName .STRING ).build()
86
- }.collect(Collectors .toList())
100
+ override fun unsafeQuery (sql : String? , vararg params : String? ): Stream <JsonNode > {
101
+ val parameterValueList =
102
+ if (params == null ) emptyList()
103
+ else
104
+ Arrays .stream(params)
105
+ .map { param: String? ->
106
+ QueryParameterValue .newBuilder()
107
+ .setValue(param)
108
+ .setType(StandardSQLTypeName .STRING )
109
+ .build()
110
+ }
111
+ .collect(Collectors .toList())
87
112
88
113
return query(sql, parameterValueList)
89
114
}
@@ -95,23 +120,33 @@ class BigQueryDatabase @JvmOverloads constructor(projectId: String?, jsonCreds:
95
120
96
121
if (result.getLeft() != null ) {
97
122
val fieldList = result.getLeft()!! .getQueryResults().schema.fields
98
- return Streams .stream(result.getLeft()!! .getQueryResults().iterateAll())
99
- .map { fieldValues: FieldValueList -> sourceOperations!! .rowToJson(BigQueryResultSet (fieldValues, fieldList)) }
100
- } else throw Exception (
101
- " Failed to execute query " + sql + (if (params != null && ! params.isEmpty()) " with params $params " else " " ) + " . Error: " + result.getRight())
123
+ return Streams .stream(result.getLeft()!! .getQueryResults().iterateAll()).map {
124
+ fieldValues: FieldValueList ->
125
+ sourceOperations!! .rowToJson(BigQueryResultSet (fieldValues, fieldList))
126
+ }
127
+ } else
128
+ throw Exception (
129
+ " Failed to execute query " +
130
+ sql +
131
+ (if (params != null && ! params.isEmpty()) " with params $params " else " " ) +
132
+ " . Error: " +
133
+ result.getRight()
134
+ )
102
135
}
103
136
104
137
fun getQueryConfig (sql : String? , params : List <QueryParameterValue >? ): QueryJobConfiguration {
105
- return QueryJobConfiguration
106
- .newBuilder(sql)
107
- .setUseLegacySql(false )
108
- .setPositionalParameters(params)
109
- .build()
138
+ return QueryJobConfiguration .newBuilder(sql)
139
+ .setUseLegacySql(false )
140
+ .setPositionalParameters(params)
141
+ .build()
110
142
}
111
143
112
- fun executeQuery (bigquery : BigQuery ? , queryConfig : QueryJobConfiguration ? ): ImmutablePair <Job ?, String ?> {
144
+ fun executeQuery (
145
+ bigquery : BigQuery ,
146
+ queryConfig : QueryJobConfiguration ?
147
+ ): ImmutablePair <Job ?, String ?> {
113
148
val jobId = JobId .of(UUID .randomUUID().toString())
114
- val queryJob = bigquery!! .create(JobInfo .newBuilder(queryConfig).setJobId(jobId).build())
149
+ val queryJob = bigquery.create(JobInfo .newBuilder(queryConfig).setJobId(jobId).build())
115
150
return executeQuery(queryJob)
116
151
}
117
152
@@ -123,13 +158,21 @@ class BigQueryDatabase @JvmOverloads constructor(projectId: String?, jsonCreds:
123
158
*/
124
159
fun getProjectTables (projectId : String? ): List <Table > {
125
160
val tableList: MutableList <Table > = ArrayList ()
126
- bigQuery!! .listDatasets(projectId)
127
- .iterateAll()
128
- .forEach(Consumer { dataset: Dataset ->
129
- bigQuery!! .listTables(dataset.datasetId)
130
- .iterateAll()
131
- .forEach(Consumer { table: Table -> tableList.add(bigQuery!! .getTable(table.tableId)) })
132
- })
161
+ bigQuery!!
162
+ .listDatasets(projectId)
163
+ .iterateAll()
164
+ .forEach(
165
+ Consumer { dataset: Dataset ->
166
+ bigQuery!!
167
+ .listTables(dataset.datasetId)
168
+ .iterateAll()
169
+ .forEach(
170
+ Consumer { table: Table ->
171
+ tableList.add(bigQuery!! .getTable(table.tableId))
172
+ }
173
+ )
174
+ }
175
+ )
133
176
return tableList
134
177
}
135
178
@@ -141,9 +184,10 @@ class BigQueryDatabase @JvmOverloads constructor(projectId: String?, jsonCreds:
141
184
*/
142
185
fun getDatasetTables (datasetId : String? ): List <Table > {
143
186
val tableList: MutableList <Table > = ArrayList ()
144
- bigQuery!! .listTables(datasetId)
145
- .iterateAll()
146
- .forEach(Consumer { table: Table -> tableList.add(bigQuery!! .getTable(table.tableId)) })
187
+ bigQuery!!
188
+ .listTables(datasetId)
189
+ .iterateAll()
190
+ .forEach(Consumer { table: Table -> tableList.add(bigQuery!! .getTable(table.tableId)) })
147
191
return tableList
148
192
}
149
193
0 commit comments