Skip to content

Commit da88158

Browse files
🎉 New SFTP source connector (#13120)
* 11797 New SFTP source * 11797 Bump sftp source version * auto-bump connector version Co-authored-by: Octavia Squidington III <[email protected]>
1 parent 83b9b4f commit da88158

File tree

28 files changed

+1327
-0
lines changed

28 files changed

+1327
-0
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1051,3 +1051,10 @@
10511051
documentationUrl: https://docs.airbyte.com/integrations/sources/zoho-crm
10521052
sourceType: api
10531053
releaseStage: alpha
1054+
- name: SFTP
1055+
sourceDefinitionId: a827c52e-791c-4135-a245-e233c5255199
1056+
dockerRepository: airbyte/source-sftp
1057+
dockerImageTag: 0.1.1
1058+
documentationUrl: https://docs.airbyte.com/integrations/sources/sftp
1059+
sourceType: api
1060+
releaseStage: alpha

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9986,3 +9986,107 @@
99869986
supportsNormalization: false
99879987
supportsDBT: false
99889988
supported_destination_sync_modes: []
9989+
- dockerImage: "airbyte/source-sftp:0.1.1"
9990+
spec:
9991+
documentationUrl: "https://docs.airbyte.io/integrations/source/sftp"
9992+
connectionSpecification:
9993+
$schema: "http://json-schema.org/draft-07/schema#"
9994+
title: "SFTP Source Spec"
9995+
type: "object"
9996+
required:
9997+
- "user"
9998+
- "host"
9999+
- "port"
10000+
additionalProperties: true
10001+
properties:
10002+
user:
10003+
title: "User Name"
10004+
description: "The server user"
10005+
type: "string"
10006+
order: 0
10007+
host:
10008+
title: "Host Address"
10009+
description: "The server host address"
10010+
type: "string"
10011+
examples:
10012+
- "www.host.com"
10013+
- "192.0.2.1"
10014+
order: 1
10015+
port:
10016+
title: "Port"
10017+
description: "The server port"
10018+
type: "integer"
10019+
default: 22
10020+
examples:
10021+
- "22"
10022+
order: 2
10023+
credentials:
10024+
type: "object"
10025+
title: "Authentication *"
10026+
description: "The server authentication method"
10027+
order: 3
10028+
oneOf:
10029+
- title: "Password Authentication"
10030+
required:
10031+
- "auth_method"
10032+
- "auth_user_password"
10033+
properties:
10034+
auth_method:
10035+
description: "Connect through password authentication"
10036+
type: "string"
10037+
const: "SSH_PASSWORD_AUTH"
10038+
order: 0
10039+
auth_user_password:
10040+
title: "Password"
10041+
description: "OS-level password for logging into the jump server host"
10042+
type: "string"
10043+
airbyte_secret: true
10044+
order: 1
10045+
- title: "SSH Key Authentication"
10046+
required:
10047+
- "auth_method"
10048+
- "auth_ssh_key"
10049+
properties:
10050+
auth_method:
10051+
description: "Connect through ssh key"
10052+
type: "string"
10053+
const: "SSH_KEY_AUTH"
10054+
order: 0
10055+
auth_ssh_key:
10056+
title: "SSH Private Key"
10057+
description: "OS-level user account ssh key credentials in RSA PEM\
10058+
\ format ( created with ssh-keygen -t rsa -m PEM -f myuser_rsa )"
10059+
type: "string"
10060+
airbyte_secret: true
10061+
multiline: true
10062+
order: 1
10063+
file_types:
10064+
title: "File types"
10065+
description: "Coma separated file types. Currently only 'csv' and 'json'\
10066+
\ types are supported."
10067+
type: "string"
10068+
default: "csv,json"
10069+
order: 4
10070+
examples:
10071+
- "csv,json"
10072+
- "csv"
10073+
folder_path:
10074+
title: "Folder Path (Optional)"
10075+
description: "The directory to search files for sync"
10076+
type: "string"
10077+
default: ""
10078+
examples:
10079+
- "/logs/2022"
10080+
order: 5
10081+
file_pattern:
10082+
title: "File Pattern (Optional)"
10083+
description: "The regular expression to specify files for sync in a chosen\
10084+
\ Folder Path"
10085+
type: "string"
10086+
default: ""
10087+
examples:
10088+
- "log-([0-9]{4})([0-9]{2})([0-9]{2}) - This will filter files which `log-yearmmdd`"
10089+
order: 6
10090+
supportsNormalization: false
10091+
supportsDBT: false
10092+
supported_destination_sync_modes: []
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
*
2+
!Dockerfile
3+
!build
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
FROM airbyte/integration-base-java:dev AS build
2+
3+
WORKDIR /airbyte
4+
ENV APPLICATION source-sftp
5+
6+
COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
7+
8+
RUN tar xf ${APPLICATION}.tar --strip-components=1 && rm -rf ${APPLICATION}.tar
9+
10+
FROM airbyte/integration-base-java:dev
11+
12+
WORKDIR /airbyte
13+
ENV APPLICATION source-sftp
14+
15+
COPY --from=build /airbyte /airbyte
16+
17+
LABEL io.airbyte.version=0.1.1
18+
LABEL io.airbyte.name=airbyte/source-sftp
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# Source Sftp
2+
3+
This is the repository for the Sftp source connector in Java.
4+
For information about how to use this connector within Airbyte, see [the User Documentation](https://docs.airbyte.io/integrations/source/sftp).
5+
6+
## Local development
7+
8+
#### Building via Gradle
9+
From the Airbyte repository root, run:
10+
```
11+
./gradlew :airbyte-integrations:connectors:source-sftp:build
12+
```
13+
14+
#### Create credentials
15+
**If you are a community contributor**, generate the necessary credentials and place them in `secrets/config.json` conforming to the spec file in `src/main/resources/spec.json`.
16+
Note that the `secrets` directory is git-ignored by default, so there is no danger of accidentally checking in sensitive information.
17+
18+
**If you are an Airbyte core member**, follow the [instructions](https://docs.airbyte.io/connector-development#using-credentials-in-ci) to set up the credentials.
19+
20+
### Locally running the connector docker image
21+
22+
#### Build
23+
Build the connector image via Gradle:
24+
```
25+
./gradlew :airbyte-integrations:connectors:source-sftp:airbyteDocker
26+
```
27+
When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in
28+
the Dockerfile.
29+
30+
#### Run
31+
Then run any of the connector commands as follows:
32+
```
33+
docker run --rm airbyte/source-sftp:dev spec
34+
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-sftp:dev check --config /secrets/config.json
35+
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-sftp:dev discover --config /secrets/config.json
36+
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-sftp:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
37+
```
38+
39+
## Testing
40+
We use `JUnit` for Java tests.
41+
42+
### Unit and Integration Tests
43+
Place unit tests under `src/test/io/airbyte/integrations/source/sftp`.
44+
45+
#### Acceptance Tests
46+
Airbyte has a standard test suite that all source connectors must pass. Implement the `TODO`s in
47+
`src/test-integration/java/io/airbyte/integrations/source/sftpSourceAcceptanceTest.java`.
48+
49+
### Using gradle to run tests
50+
All commands should be run from airbyte project root.
51+
To run unit tests:
52+
```
53+
./gradlew :airbyte-integrations:connectors:source-sftp:unitTest
54+
```
55+
To run acceptance and custom integration tests:
56+
```
57+
./gradlew :airbyte-integrations:connectors:source-sftp:integrationTest
58+
```
59+
60+
## Dependency Management
61+
62+
### Publishing a new version of the connector
63+
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
64+
1. Make sure your changes are passing unit and integration tests.
65+
1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)).
66+
1. Create a Pull Request.
67+
1. Pat yourself on the back for being an awesome contributor.
68+
1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
plugins {
2+
id 'application'
3+
id 'airbyte-docker'
4+
id 'airbyte-integration-test-java'
5+
}
6+
7+
application {
8+
mainClass = 'io.airbyte.integrations.source.sftp.SftpSource'
9+
}
10+
11+
dependencies {
12+
implementation project(':airbyte-config:models')
13+
implementation project(':airbyte-protocol:models')
14+
implementation project(':airbyte-integrations:bases:base-java')
15+
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
16+
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv:2.13.2'
17+
implementation 'com.jcraft:jsch:0.1.55'
18+
19+
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')
20+
integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-sftp')
21+
integrationTestJavaImplementation "org.testcontainers:testcontainers:1.15.3"
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package io.airbyte.integrations.source.sftp;
2+
3+
import com.fasterxml.jackson.databind.JsonNode;
4+
import com.jcraft.jsch.Channel;
5+
import com.jcraft.jsch.ChannelSftp;
6+
import com.jcraft.jsch.JSch;
7+
import com.jcraft.jsch.JSchException;
8+
import com.jcraft.jsch.Session;
9+
import com.jcraft.jsch.SftpException;
10+
import io.airbyte.integrations.source.sftp.enums.SftpAuthMethod;
11+
import io.airbyte.integrations.source.sftp.enums.SupportedFileExtension;
12+
import org.apache.commons.io.IOUtils;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
16+
import java.io.ByteArrayInputStream;
17+
import java.io.File;
18+
import java.io.FileOutputStream;
19+
import java.io.InputStream;
20+
import java.nio.charset.StandardCharsets;
21+
import java.util.Properties;
22+
import java.util.Vector;
23+
24+
public class SftpClient {
25+
26+
protected static final Logger LOGGER = LoggerFactory.getLogger(SftpClient.class);
27+
private static final String CHANNEL_SFTP = "sftp";
28+
private static final String STRICT_HOST_KEY_CHECKING = "StrictHostKeyChecking";
29+
30+
private final String username;
31+
private final String hostAddress;
32+
private final int port;
33+
private final SftpAuthMethod authMethod;
34+
private final JsonNode config;
35+
private final int connectionTimeoutMillis = 60000;
36+
private final JSch jsch;
37+
private Session session;
38+
private ChannelSftp channelSftp;
39+
40+
public SftpClient(JsonNode config) {
41+
this.config = config;
42+
jsch = new JSch();
43+
username = config.has("user") ? config.get("user").asText() : "";
44+
hostAddress = config.has("host") ? config.get("host").asText() : "";
45+
port = config.has("port") ? config.get("port").asInt() : 22;
46+
authMethod = SftpAuthMethod.valueOf(config.get("credentials").get("auth_method").asText());
47+
}
48+
49+
public void connect() {
50+
try {
51+
LOGGER.info("Connecting to the server");
52+
configureSession();
53+
configureAuthMethod();
54+
LOGGER.debug("Connecting to host: {} at port: {}", hostAddress, port);
55+
session.connect();
56+
Channel channel = session.openChannel(CHANNEL_SFTP);
57+
channel.connect();
58+
59+
channelSftp = (ChannelSftp) channel;
60+
LOGGER.info("Connected successfully");
61+
} catch (Exception e) {
62+
LOGGER.error("Exception attempting to connect to the server:", e);
63+
throw new RuntimeException(e);
64+
}
65+
}
66+
67+
private void configureSession() throws JSchException {
68+
Properties properties = new Properties();
69+
properties.put(STRICT_HOST_KEY_CHECKING, "no");
70+
session = jsch.getSession(username, hostAddress, port);
71+
session.setConfig(properties);
72+
session.setTimeout(connectionTimeoutMillis);
73+
}
74+
75+
public void disconnect() {
76+
LOGGER.info("Disconnecting from the server");
77+
if (channelSftp != null) {
78+
channelSftp.disconnect();
79+
}
80+
if (session != null) {
81+
session.disconnect();
82+
}
83+
LOGGER.info("Disconnected successfully");
84+
}
85+
86+
public boolean isConnected() {
87+
return channelSftp != null && channelSftp.isConnected();
88+
}
89+
90+
public Vector lsFile(SupportedFileExtension fileExtension) {
91+
try {
92+
return channelSftp.ls("*." + fileExtension.typeName);
93+
} catch (SftpException e) {
94+
LOGGER.error("Exception occurred while trying to find files with type {} : ", fileExtension, e);
95+
throw new RuntimeException(e);
96+
}
97+
}
98+
99+
public void changeWorkingDirectory(String path) throws SftpException {
100+
channelSftp.cd(path);
101+
}
102+
103+
public ByteArrayInputStream getFile(String fileName) {
104+
try (InputStream inputStream = channelSftp.get(fileName)) {
105+
return new ByteArrayInputStream(IOUtils.toByteArray(inputStream));
106+
} catch (Exception e) {
107+
LOGGER.error("Exception occurred while trying to download file {} : ", fileName, e);
108+
throw new RuntimeException(e);
109+
}
110+
}
111+
112+
private void configureAuthMethod() throws Exception {
113+
switch (authMethod) {
114+
case SSH_PASSWORD_AUTH -> session.setPassword(config.get("credentials").get("auth_user_password").asText());
115+
case SSH_KEY_AUTH -> {
116+
File tempFile = File.createTempFile("private_key", "", null);
117+
FileOutputStream fos = new FileOutputStream(tempFile);
118+
fos.write(config.get("credentials").get("auth_ssh_key").asText().getBytes(StandardCharsets.UTF_8));
119+
jsch.addIdentity(tempFile.getAbsolutePath());
120+
}
121+
default -> throw new RuntimeException("Unsupported SFTP Authentication type");
122+
}
123+
}
124+
125+
}

0 commit comments

Comments
 (0)