Skip to content

Commit f71754d

Browse files
authored
close ssh in case of exception during check in Postgres connector (#10620)
* close ssh in case of exception * remove unwanted change * remove comment * format * do not close scanner * fix semi-colon * format
1 parent 12ddcdf commit f71754d

File tree

5 files changed

+52
-52
lines changed

5 files changed

+52
-52
lines changed

airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,7 @@ public void runInternal(final ITransaction transaction, final IntegrationConfig
135135
validateConfig(integration.spec().getConnectionSpecification(), config, "READ");
136136
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);
137137
final Optional<JsonNode> stateOptional = parsed.getStatePath().map(IntegrationRunner::parseConfig);
138-
final AutoCloseableIterator<AirbyteMessage> messageIterator = source.read(config, catalog, stateOptional.orElse(null));
139-
try (messageIterator) {
138+
try (final AutoCloseableIterator<AirbyteMessage> messageIterator = source.read(config, catalog, stateOptional.orElse(null))) {
140139
AirbyteSentry.executeWithTracing("ReadSource", () -> messageIterator.forEachRemaining(outputRecordCollector::accept));
141140
}
142141
}
@@ -145,8 +144,9 @@ public void runInternal(final ITransaction transaction, final IntegrationConfig
145144
final JsonNode config = parseConfig(parsed.getConfigPath());
146145
validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE");
147146
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);
148-
final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, outputRecordCollector);
149-
AirbyteSentry.executeWithTracing("WriteDestination", () -> consumeWriteStream(consumer));
147+
try (final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, outputRecordCollector)) {
148+
AirbyteSentry.executeWithTracing("WriteDestination", () -> consumeWriteStream(consumer));
149+
}
150150
}
151151
default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand());
152152
}
@@ -159,16 +159,14 @@ static void consumeWriteStream(final AirbyteMessageConsumer consumer) throws Exc
159159
// use a Scanner that only processes new line characters to strictly abide with the
160160
// https://jsonlines.org/ standard
161161
final Scanner input = new Scanner(System.in).useDelimiter("[\r\n]+");
162-
try (consumer) {
163-
consumer.start();
164-
while (input.hasNext()) {
165-
final String inputString = input.next();
166-
final Optional<AirbyteMessage> messageOptional = Jsons.tryDeserialize(inputString, AirbyteMessage.class);
167-
if (messageOptional.isPresent()) {
168-
consumer.accept(messageOptional.get());
169-
} else {
170-
LOGGER.error("Received invalid message: " + inputString);
171-
}
162+
consumer.start();
163+
while (input.hasNext()) {
164+
final String inputString = input.next();
165+
final Optional<AirbyteMessage> messageOptional = Jsons.tryDeserialize(inputString, AirbyteMessage.class);
166+
if (messageOptional.isPresent()) {
167+
consumer.accept(messageOptional.get());
168+
} else {
169+
LOGGER.error("Received invalid message: " + inputString);
172170
}
173171
}
174172
}

airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshWrappedSource.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,12 @@
1414
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
1515
import io.airbyte.protocol.models.ConnectorSpecification;
1616
import java.util.List;
17+
import org.slf4j.Logger;
18+
import org.slf4j.LoggerFactory;
1719

1820
public class SshWrappedSource implements Source {
1921

22+
private static final Logger LOGGER = LoggerFactory.getLogger(SshWrappedSource.class);
2023
private final Source delegate;
2124
private final List<String> hostKey;
2225
private final List<String> portKey;
@@ -46,7 +49,15 @@ public AirbyteCatalog discover(final JsonNode config) throws Exception {
4649
public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final JsonNode state)
4750
throws Exception {
4851
final SshTunnel tunnel = SshTunnel.getInstance(config, hostKey, portKey);
49-
return AutoCloseableIterators.appendOnClose(delegate.read(tunnel.getConfigInTunnel(), catalog, state), tunnel::close);
52+
final AutoCloseableIterator<AirbyteMessage> delegateRead;
53+
try {
54+
delegateRead = delegate.read(tunnel.getConfigInTunnel(), catalog, state);
55+
} catch (final Exception e) {
56+
LOGGER.error("Exception occurred while getting the delegate read iterator, closing SSH tunnel", e);
57+
tunnel.close();
58+
throw e;
59+
}
60+
return AutoCloseableIterators.appendOnClose(delegateRead, tunnel::close);
5061
}
5162

5263
}

airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -241,14 +241,13 @@ void testDestinationConsumerLifecycleSuccess() throws Exception {
241241
+ Jsons.serialize(message2) + "\n"
242242
+ Jsons.serialize(stateMessage)).getBytes()));
243243

244-
final AirbyteMessageConsumer airbyteMessageConsumerMock = mock(AirbyteMessageConsumer.class);
245-
IntegrationRunner.consumeWriteStream(airbyteMessageConsumerMock);
246-
247-
final InOrder inOrder = inOrder(airbyteMessageConsumerMock);
248-
inOrder.verify(airbyteMessageConsumerMock).accept(message1);
249-
inOrder.verify(airbyteMessageConsumerMock).accept(message2);
250-
inOrder.verify(airbyteMessageConsumerMock).accept(stateMessage);
251-
inOrder.verify(airbyteMessageConsumerMock).close();
244+
try (final AirbyteMessageConsumer airbyteMessageConsumerMock = mock(AirbyteMessageConsumer.class)) {
245+
IntegrationRunner.consumeWriteStream(airbyteMessageConsumerMock);
246+
final InOrder inOrder = inOrder(airbyteMessageConsumerMock);
247+
inOrder.verify(airbyteMessageConsumerMock).accept(message1);
248+
inOrder.verify(airbyteMessageConsumerMock).accept(message2);
249+
inOrder.verify(airbyteMessageConsumerMock).accept(stateMessage);
250+
}
252251
}
253252

254253
@Test
@@ -267,15 +266,13 @@ void testDestinationConsumerLifecycleFailure() throws Exception {
267266
.withEmittedAt(EMITTED_AT));
268267
System.setIn(new ByteArrayInputStream((Jsons.serialize(message1) + "\n" + Jsons.serialize(message2)).getBytes()));
269268

270-
final AirbyteMessageConsumer airbyteMessageConsumerMock = mock(AirbyteMessageConsumer.class);
271-
doThrow(new IOException("error")).when(airbyteMessageConsumerMock).accept(message1);
272-
273-
assertThrows(IOException.class, () -> IntegrationRunner.consumeWriteStream(airbyteMessageConsumerMock));
274-
275-
final InOrder inOrder = inOrder(airbyteMessageConsumerMock);
276-
inOrder.verify(airbyteMessageConsumerMock).accept(message1);
277-
inOrder.verify(airbyteMessageConsumerMock).close();
278-
inOrder.verifyNoMoreInteractions();
269+
try (final AirbyteMessageConsumer airbyteMessageConsumerMock = mock(AirbyteMessageConsumer.class)) {
270+
doThrow(new IOException("error")).when(airbyteMessageConsumerMock).accept(message1);
271+
assertThrows(IOException.class, () -> IntegrationRunner.consumeWriteStream(airbyteMessageConsumerMock));
272+
final InOrder inOrder = inOrder(airbyteMessageConsumerMock);
273+
inOrder.verify(airbyteMessageConsumerMock).accept(message1);
274+
inOrder.verifyNoMoreInteractions();
275+
}
279276
}
280277

281278
}

airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSource.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -115,18 +115,18 @@ public JdbcDatabase createDatabase(final JsonNode config) throws SQLException {
115115
final JsonNode jdbcConfig = toDatabaseConfig(config);
116116

117117
final JdbcDatabase database = Databases.createJdbcDatabase(
118-
jdbcConfig.get("username").asText(),
119-
jdbcConfig.has("password") ? jdbcConfig.get("password").asText() : null,
120-
jdbcConfig.get("jdbc_url").asText(),
121-
driverClass,
122-
jdbcConfig.has("connection_properties") ? jdbcConfig.get("connection_properties").asText() : null,
123-
sourceOperations);
118+
jdbcConfig.get("username").asText(),
119+
jdbcConfig.has("password") ? jdbcConfig.get("password").asText() : null,
120+
jdbcConfig.get("jdbc_url").asText(),
121+
driverClass,
122+
jdbcConfig.has("connection_properties") ? jdbcConfig.get("connection_properties").asText() : null,
123+
sourceOperations);
124124

125125
quoteString = (quoteString == null ? database.getMetaData().getIdentifierQuoteString() : quoteString);
126126

127127
return new CockroachJdbcDatabase(database, sourceOperations);
128128
}
129-
129+
130130
private CheckedFunction<Connection, PreparedStatement, SQLException> getPrivileges(JdbcDatabase database) {
131131
return connection -> {
132132
final PreparedStatement ps = connection.prepareStatement(

airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachJdbcDatabase.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,6 @@
99
import io.airbyte.commons.functional.CheckedFunction;
1010
import io.airbyte.db.JdbcCompatibleSourceOperations;
1111
import io.airbyte.db.jdbc.JdbcDatabase;
12-
import io.airbyte.db.jdbc.JdbcStreamingQueryConfiguration;
13-
14-
import javax.sql.DataSource;
15-
1612
import java.sql.Connection;
1713
import java.sql.DatabaseMetaData;
1814
import java.sql.PreparedStatement;
@@ -22,17 +18,15 @@
2218
import java.util.stream.Stream;
2319

2420
/**
25-
* This implementation uses non-streamed queries to CockroachDB. CockroachDB
26-
* does not currently support multiple active pgwire portals on the same session,
27-
* which makes it impossible to replicate tables that have over ~1000 rows
28-
* using StreamingJdbcDatabase. See: https://go.crdb.dev/issue-v/40195/v21.2
29-
* and in particular, the comment:
30-
* https://github.com/cockroachdb/cockroach/issues/40195?version=v21.2#issuecomment-870570351
31-
* The same situation as kafka-connect applies to StreamingJdbcDatabase
21+
* This implementation uses non-streamed queries to CockroachDB. CockroachDB does not currently
22+
* support multiple active pgwire portals on the same session, which makes it impossible to
23+
* replicate tables that have over ~1000 rows using StreamingJdbcDatabase. See:
24+
* https://go.crdb.dev/issue-v/40195/v21.2 and in particular, the comment:
25+
* https://github.com/cockroachdb/cockroach/issues/40195?version=v21.2#issuecomment-870570351 The
26+
* same situation as kafka-connect applies to StreamingJdbcDatabase
3227
*/
3328
public class CockroachJdbcDatabase
34-
extends JdbcDatabase
35-
{
29+
extends JdbcDatabase {
3630

3731
private final JdbcDatabase database;
3832

0 commit comments

Comments
 (0)