Skip to content

Commit 1de50aa

Browse files
xiaohansongMarius Posta
and
Marius Posta
authored
[source-mysqlv2] A new mysqlv2 (#44606)
Co-authored-by: Marius Posta <[email protected]>
1 parent 081a0ca commit 1de50aa

19 files changed

+1841
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
plugins {
2+
id 'airbyte-bulk-connector'
3+
}
4+
5+
application {
6+
mainClass = 'io.airbyte.integrations.source.mysql.MysqlSource'
7+
}
8+
9+
airbyteBulkConnector {
10+
core = 'extract'
11+
toolkits = ['extract-jdbc']
12+
cdk = 'local'
13+
}
14+
15+
dependencies {
16+
implementation 'mysql:mysql-connector-java:8.0.30'
17+
implementation 'org.bouncycastle:bcpkix-jdk18on:1.77'
18+
implementation 'org.bouncycastle:bcprov-jdk18on:1.77'
19+
implementation 'org.bouncycastle:bctls-jdk18on:1.77'
20+
21+
testImplementation platform('org.testcontainers:testcontainers-bom:1.19.8')
22+
testImplementation 'org.testcontainers:mysql'
23+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
testExecutionConcurrency=1
2+
JunitMethodExecutionTimeout=5m
Loading
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
data:
2+
ab_internal:
3+
ql: 200
4+
sl: 100
5+
allowedHosts:
6+
hosts:
7+
- ${host}
8+
- ${tunnel_method.tunnel_host}
9+
connectorSubtype: database
10+
connectorType: source
11+
definitionId: 561393ed-7e3a-4d0d-8b8b-90ded371754c
12+
dockerImageTag: 0.0.1
13+
dockerRepository: airbyte/source-mysql-v2
14+
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
15+
githubIssueLabel: source-mysql-v2
16+
icon: mysql.svg
17+
license: ELv2
18+
name: Mysqlv2 Source
19+
registryOverrides:
20+
cloud:
21+
enabled: false
22+
oss:
23+
enabled: false
24+
releaseStage: alpha
25+
supportLevel: archived
26+
tags:
27+
- language:java
28+
metadataSpecVersion: "1.0"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.source.mysql
6+
7+
import io.airbyte.cdk.read.JdbcSelectQuerier
8+
import io.airbyte.cdk.read.SelectQuerier
9+
import io.airbyte.cdk.read.SelectQuery
10+
import io.github.oshai.kotlinlogging.KotlinLogging
11+
import io.micronaut.context.annotation.Primary
12+
import javax.inject.Singleton
13+
14+
private val log = KotlinLogging.logger {}
15+
16+
@Singleton
17+
@Primary
18+
class MysqlJdbcSelectQuerier(val base: JdbcSelectQuerier) : SelectQuerier by base {
19+
20+
override fun executeQuery(
21+
q: SelectQuery,
22+
parameters: SelectQuerier.Parameters
23+
): SelectQuerier.Result {
24+
log.info { "Executing query: ${q.sql}" }
25+
return base.executeQuery(q, SelectQuerier.Parameters(fetchSize = Int.MIN_VALUE))
26+
}
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
2+
package io.airbyte.integrations.source.mysql
3+
4+
import io.airbyte.cdk.AirbyteSourceRunner
5+
6+
object MysqlSource {
7+
@JvmStatic
8+
fun main(args: Array<String>) {
9+
AirbyteSourceRunner.run(*args)
10+
}
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
2+
package io.airbyte.integrations.source.mysql
3+
4+
import io.airbyte.cdk.ConfigErrorException
5+
import io.airbyte.cdk.command.JdbcSourceConfiguration
6+
import io.airbyte.cdk.command.SourceConfiguration
7+
import io.airbyte.cdk.command.SourceConfigurationFactory
8+
import io.airbyte.cdk.ssh.SshConnectionOptions
9+
import io.airbyte.cdk.ssh.SshTunnelMethodConfiguration
10+
import io.github.oshai.kotlinlogging.KotlinLogging
11+
import jakarta.inject.Singleton
12+
import java.net.URLDecoder
13+
import java.nio.charset.StandardCharsets
14+
import java.time.Duration
15+
16+
private val log = KotlinLogging.logger {}
17+
18+
/** Mysql-specific implementation of [SourceConfiguration] */
19+
data class MysqlSourceConfiguration(
20+
override val realHost: String,
21+
override val realPort: Int,
22+
override val sshTunnel: SshTunnelMethodConfiguration,
23+
override val sshConnectionOptions: SshConnectionOptions,
24+
override val jdbcUrlFmt: String,
25+
override val jdbcProperties: Map<String, String>,
26+
override val namespaces: Set<String>,
27+
val cursorConfiguration: CursorConfiguration,
28+
override val maxConcurrency: Int,
29+
override val resourceAcquisitionHeartbeat: Duration = Duration.ofMillis(100L),
30+
override val checkpointTargetInterval: Duration,
31+
override val checkPrivileges: Boolean,
32+
) : JdbcSourceConfiguration {
33+
override val global = cursorConfiguration is CdcCursor
34+
}
35+
36+
@Singleton
37+
class MysqlSourceConfigurationFactory :
38+
SourceConfigurationFactory<MysqlSourceConfigurationJsonObject, MysqlSourceConfiguration> {
39+
override fun makeWithoutExceptionHandling(
40+
pojo: MysqlSourceConfigurationJsonObject,
41+
): MysqlSourceConfiguration {
42+
val realHost: String = pojo.host
43+
val realPort: Int = pojo.port
44+
val sshTunnel: SshTunnelMethodConfiguration = pojo.getTunnelMethodValue()
45+
val jdbcProperties = mutableMapOf<String, String>()
46+
jdbcProperties["user"] = pojo.username
47+
pojo.password?.let { jdbcProperties["password"] = it }
48+
49+
// Parse URL parameters.
50+
val pattern = "^([^=]+)=(.*)$".toRegex()
51+
for (pair in (pojo.jdbcUrlParams ?: "").trim().split("&".toRegex())) {
52+
if (pair.isBlank()) {
53+
continue
54+
}
55+
val result: MatchResult? = pattern.matchEntire(pair)
56+
if (result == null) {
57+
log.warn { "ignoring invalid JDBC URL param '$pair'" }
58+
} else {
59+
val key: String = result.groupValues[1].trim()
60+
val urlEncodedValue: String = result.groupValues[2].trim()
61+
jdbcProperties[key] = URLDecoder.decode(urlEncodedValue, StandardCharsets.UTF_8)
62+
}
63+
}
64+
// Determine protocol and configure encryption.
65+
val encryption: Encryption = pojo.getEncryptionValue()
66+
if (encryption is SslVerifyCertificate) {
67+
// TODO: reuse JdbcSSLCOnnectionUtils; parse the input into properties
68+
}
69+
// Build JDBC URL
70+
val address = "%s:%d"
71+
val jdbcUrlFmt = "jdbc:mysql://${address}"
72+
jdbcProperties["useCursorFetch"] = "true"
73+
jdbcProperties["sessionVariables"] = "autocommit=0"
74+
val defaultSchema: String = pojo.username.uppercase()
75+
val sshOpts = SshConnectionOptions.fromAdditionalProperties(pojo.getAdditionalProperties())
76+
val checkpointTargetInterval: Duration =
77+
Duration.ofSeconds(pojo.checkpointTargetIntervalSeconds?.toLong() ?: 0)
78+
if (!checkpointTargetInterval.isPositive) {
79+
throw ConfigErrorException("Checkpoint Target Interval should be positive")
80+
}
81+
val maxConcurrency: Int = pojo.concurrency ?: 0
82+
if ((pojo.concurrency ?: 0) <= 0) {
83+
throw ConfigErrorException("Concurrency setting should be positive")
84+
}
85+
return MysqlSourceConfiguration(
86+
realHost = realHost,
87+
realPort = realPort,
88+
sshTunnel = sshTunnel,
89+
sshConnectionOptions = sshOpts,
90+
jdbcUrlFmt = jdbcUrlFmt,
91+
jdbcProperties = jdbcProperties,
92+
namespaces = pojo.schemas?.toSet() ?: setOf(defaultSchema),
93+
cursorConfiguration = pojo.getCursorConfigurationValue(),
94+
checkpointTargetInterval = checkpointTargetInterval,
95+
maxConcurrency = maxConcurrency,
96+
checkPrivileges = pojo.checkPrivileges ?: true,
97+
)
98+
}
99+
}

0 commit comments

Comments
 (0)