Skip to content

Commit 6907038

Browse files
committed
revert: fix: orchestrator shouldn't fail fast on ReplicationTask error (#16969)
1 parent 412193a commit 6907038

File tree

2 files changed

+8
-18
lines changed

2 files changed

+8
-18
lines changed

airbyte-container-orchestrator/src/main/kotlin/io/airbyte/container/orchestrator/worker/ReplicationTask.kt

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ class DestinationReader(
5757
}
5858
} catch (e: Exception) {
5959
logger.error(e) { "DestinationReader error: " }
60-
replicationWorkerState.abort()
6160
if (e is DestinationException) {
6261
throw e
6362
} else if (e !is CancellationException) {
@@ -105,7 +104,6 @@ class DestinationWriter(
105104
}
106105
} catch (e: Exception) {
107106
logger.error(e) { "DestinationWriter error: " }
108-
replicationWorkerState.abort()
109107
handleException(e)
110108
} finally {
111109
notifyEndOfInput()
@@ -118,7 +116,6 @@ class DestinationWriter(
118116
try {
119117
destination.notifyEndOfInput()
120118
} catch (e: Exception) {
121-
replicationWorkerState.abort()
122119
handleException(e)
123120
}
124121
}
@@ -210,7 +207,6 @@ class SourceReader(
210207
}
211208
} catch (e: Exception) {
212209
logger.error(e) { "SourceReader error: " }
213-
replicationWorkerState.abort()
214210
if (e is SourceException) {
215211
throw e
216212
} else if (e !is CancellationException) {

airbyte-container-orchestrator/src/main/kotlin/io/airbyte/container/orchestrator/worker/ReplicationWorker.kt

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import kotlinx.coroutines.awaitAll
2525
import kotlinx.coroutines.coroutineScope
2626
import kotlinx.coroutines.runBlocking
2727
import kotlinx.coroutines.slf4j.MDCContext
28-
import kotlinx.coroutines.supervisorScope
2928
import kotlinx.coroutines.withContext
3029
import org.slf4j.MDC
3130
import java.nio.file.Path
@@ -146,18 +145,13 @@ class ReplicationWorker(
146145
dispatcher: ExecutorCoroutineDispatcher,
147146
mdc: Map<String, String>,
148147
) {
149-
// Use supervisorScope instead of coroutineScope so that a failure in one job
150-
// doesn't immediately cancel all the other jobs.
151-
// This allows other jobs to terminate gracefully, which is necessary
152-
// to avoid dropping trace messages.
153-
// In particular, if the source connector emits a trace and then immediately exits,
154-
// we want to allow the MessageProcessor tak to read that trace message.
155-
// Tasks are responsible for calling replicationWorkerState.abort()
156-
// to signal other tasks to terminate.
157-
supervisorScope {
158-
syncReplicationJobs.map { job ->
159-
AsyncUtils.runAsync(dispatcher, this, mdc) { job.run() }
160-
}
161-
}.awaitAll()
148+
coroutineScope {
149+
val tasks =
150+
syncReplicationJobs.map { job ->
151+
AsyncUtils.runAsync(dispatcher, this, mdc) { job.run() }
152+
}
153+
154+
tasks.awaitAll()
155+
}
162156
}
163157
}

0 commit comments

Comments
 (0)