Skip to content

Add Singer infra #58

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Aug 14, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
.git
.gradle
.idea
**/build
**/node_modules
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
VERSION=0.1.0
ENV=docker
ENV=docker
6 changes: 5 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {

Properties env = new Properties()
File envFile = new File('.env')
envFile.withInputStream {env.load(it) }
envFile.withInputStream { env.load(it) }

if (!env.containsKey("VERSION")) {
throw new Exception("Version not specified in .env file...")
Expand Down Expand Up @@ -39,6 +39,10 @@ subprojects {

test {
useJUnitPlatform()
testLogging() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if tests fail we want to see the reasons on the CLI

events "failed"
exceptionFormat "full"
}
}

dependencies {
Expand Down
12 changes: 12 additions & 0 deletions dataline-workers/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,15 @@ plugins {
id 'java-library'
}

configurations {
jdbc
}

dependencies {
testImplementation "com.fasterxml.jackson.core:jackson-databind:2.9.8"

testImplementation "org.postgresql:postgresql:42.1.4"
testImplementation "org.testcontainers:testcontainers:1.14.3"
testImplementation "org.testcontainers:postgresql:1.14.3"
}

Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
import static io.dataline.workers.JobStatus.*;

import io.dataline.workers.JobStatus;
import io.dataline.workers.OutputAndStatus;
import io.dataline.workers.Worker;
import java.io.*;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -18,16 +20,36 @@ public abstract class BaseSingerWorker<OutputType> implements Worker<OutputType>
protected JobStatus jobStatus;
protected String workerId;
protected Process workerProcess;
protected final Path workspacePath;

private final String workspaceRoot;
private final String singerRoot;

protected BaseSingerWorker(String workerId, String workspaceRoot, String singerRoot) {
this.workerId = workerId;
this.workspaceRoot = workspaceRoot;
this.workspacePath = Path.of(workspaceRoot, workerId);
this.singerRoot = singerRoot;
}

@Override
public OutputAndStatus<OutputType> run() {
createWorkspace();
try {
return runInternal();
} finally {
}
}

public abstract OutputAndStatus<OutputType> runInternal();

private void createWorkspace() {
try {
FileUtils.forceMkdir(workspacePath.toFile());
} catch (IOException e) {
LOGGER.error("Unable to create workspace for worker {} due to exception {} ", workerId, e);
throw new RuntimeException(e);
}
}

@Override
public void cancel() {
try {
Expand All @@ -43,7 +65,7 @@ public void cancel() {
}

protected Path getWorkspacePath() {
return Paths.get(workspaceRoot, workerId);
return workspacePath;
}

protected String readFileFromWorkspace(String fileName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ public class SingerDiscoveryWorker extends BaseSingerWorker<DiscoveryOutput> {

private final String configDotJson;
private final SingerTap tap;
private DiscoveryOutput output;

public SingerDiscoveryWorker(
String workerId,
Expand All @@ -33,7 +32,7 @@ public SingerDiscoveryWorker(
}

@Override
public OutputAndStatus<DiscoveryOutput> run() {
public OutputAndStatus<DiscoveryOutput> runInternal() {
// TODO use format converter here
// write config.json to disk
String configPath = writeFileToWorkspace(CONFIG_JSON_FILENAME, configDotJson);
Expand All @@ -46,16 +45,24 @@ public OutputAndStatus<DiscoveryOutput> run() {
getWorkspacePath().resolve(ERROR_LOG_FILENAME).toAbsolutePath().toString();
// exec
try {

String[] cmd = {tapPath, "--config", configPath, "--discover"};

Process workerProcess =
new ProcessBuilder(tapPath, "--config " + configPath, "--discover")
new ProcessBuilder(cmd)
.redirectError(new File(errorLogPath))
.redirectOutput(new File(catalogDotJsonPath))
.start();
workerProcess.wait();
if (workerProcess.exitValue() == 0) {
synchronized (workerProcess) {
workerProcess.wait();
}
int exitCode = workerProcess.exitValue();
if (exitCode == 0) {
String catalog = readFileFromWorkspace(CATALOG_JSON_FILENAME);
return new OutputAndStatus<>(SUCCESSFUL, new DiscoveryOutput(catalog));
} else {
LOGGER.debug(
"Discovery worker {} subprocess finished with exit code {}", workerId, exitCode);
return new OutputAndStatus<>(FAILED);
}
} catch (IOException | InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
public enum SingerTap implements SingerConnector {
// TODO
S3_CSV("", ""),
POSTGRES("", ""),
POSTGRES("tap-postgres", "tap-postgres"),
STRIPE("", "");

private final String getPythonVirtualEnvName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ public abstract class BaseWorkerTestCase {
@BeforeAll
public void init() {
createTestWorkspace();
deleteWorkspaceUponJvmExit();
try {
FileUtils.forceDeleteOnExit(workspaceDirectory.toFile());
} catch (IOException e) {
throw new RuntimeException(e);
}
}

protected Path getWorkspacePath() {
Expand All @@ -24,23 +28,11 @@ protected Path getWorkspacePath() {

private void createTestWorkspace() {
try {
workspaceDirectory = Paths.get("/tmp/tests/dataline-" + UUID.randomUUID().toString());
workspaceDirectory =
Paths.get("/tmp/tests/dataline-" + UUID.randomUUID().toString().substring(0, 8));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Files.createTempDirectory("dataline");?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah didn't know about that one -- will take a look. TY for the tip.

FileUtils.forceMkdir(workspaceDirectory.toFile());
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private void deleteWorkspaceUponJvmExit() {
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
try {
FileUtils.deleteDirectory(workspaceDirectory.toFile());
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,75 @@
package io.dataline.workers.singer;

public class TestSingerDiscoveryWorker {
// TODO pending installing singer binaries into the workspace
import static io.dataline.workers.JobStatus.SUCCESSFUL;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import com.google.common.io.Resources;
import io.dataline.workers.BaseWorkerTestCase;
import io.dataline.workers.DiscoveryOutput;
import io.dataline.workers.OutputAndStatus;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Map;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.PostgreSQLContainer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious do these test past on github?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup!


public class TestSingerDiscoveryWorker extends BaseWorkerTestCase {

@Test
public void testPostgresDiscovery() throws SQLException, IOException {
PostgreSQLContainer db = new PostgreSQLContainer();
db.start();
Connection con =
DriverManager.getConnection(db.getJdbcUrl(), db.getUsername(), db.getPassword());
con.createStatement().execute("CREATE TABLE id_and_name (id integer, name VARCHAR(200));");

String postgresCreds = getPostgresConfigJson(db);
SingerDiscoveryWorker worker =
new SingerDiscoveryWorker(
"1",
postgresCreds,
SingerTap.POSTGRES,
getWorkspacePath().toAbsolutePath().toString(),
"/usr/local/lib/singer/"); // TODO inject as env variable

System.out.println(getWorkspacePath().toAbsolutePath().toString());
System.out.println(postgresCreds);
OutputAndStatus<DiscoveryOutput> run = worker.run();
assertEquals(SUCCESSFUL, run.status);

String expectedCatalog =
Resources.toString(
Resources.getResource("simple_postgres_catalog.json"), Charset.defaultCharset());
assertTrue(run.output.isPresent());
assertEquals(expectedCatalog, run.output.get().catalog);
}

private String readResource(String name) {
URL resource = Resources.getResource(name);
try {
return Resources.toString(resource, Charset.defaultCharset());
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private String getPostgresConfigJson(PostgreSQLContainer psqlContainer)
throws JsonProcessingException {
Map<String, String> props = Maps.newHashMap();
props.put("dbname", psqlContainer.getDatabaseName());
props.put("user", psqlContainer.getUsername());
props.put("password", psqlContainer.getPassword());
props.put("host", psqlContainer.getHost());
props.put("port", String.valueOf(psqlContainer.getFirstMappedPort()));

return new ObjectMapper().writeValueAsString(props);
}
}
Loading