diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java index 6ab4ef51bab77..35aae3d1beeec 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java @@ -9,6 +9,7 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import datadog.trace.api.Trace; import io.airbyte.commons.io.LineGobbler; @@ -139,7 +140,6 @@ public final ReplicationOutput run(final StandardSyncInput syncInput, final Path destinationConfig.setCatalog(mapper.mapCatalog(destinationConfig.getCatalog())); final ThreadedTimeTracker timeTracker = new ThreadedTimeTracker(); - final long startTime = System.currentTimeMillis(); timeTracker.trackReplicationStartTime(); final AtomicReference replicationRunnableFailureRef = new AtomicReference<>(); @@ -152,202 +152,137 @@ public final ReplicationOutput run(final StandardSyncInput syncInput, final Path s -> String.format("%s - %s", s.getSyncMode(), s.getDestinationSyncMode())))); final WorkerSourceConfig sourceConfig = WorkerUtils.syncToWorkerSourceConfig(syncInput); - final Map mdc = MDC.getCopyOfContextMap(); - ApmTraceUtils.addTagsToTrace(generateTraceTags(destinationConfig, jobRoot)); - - // note: resources are closed in the opposite order in which they are declared. thus source will be - // closed first (which is what we want). - try (destination; source) { - destination.start(destinationConfig, jobRoot); - timeTracker.trackSourceReadStartTime(); - source.start(sourceConfig, jobRoot); - timeTracker.trackDestinationWriteStartTime(); - - // note: `whenComplete` is used instead of `exceptionally` so that the original exception is still - // thrown - final CompletableFuture destinationOutputThreadFuture = CompletableFuture.runAsync( - getDestinationOutputRunnable(destination, cancelled, messageTracker, mdc, timeTracker), - executors) - .whenComplete((msg, ex) -> { - if (ex != null) { - if (ex.getCause() instanceof DestinationException) { - destinationRunnableFailureRef.set(FailureHelper.destinationFailure(ex, Long.valueOf(jobId), attempt)); - } else { - destinationRunnableFailureRef.set(FailureHelper.replicationFailure(ex, Long.valueOf(jobId), attempt)); - } - } - }); - - final CompletableFuture replicationThreadFuture = CompletableFuture.runAsync( - getReplicationRunnable(source, destination, cancelled, mapper, messageTracker, mdc, recordSchemaValidator, metricReporter, timeTracker), - executors) - .whenComplete((msg, ex) -> { - if (ex != null) { - if (ex.getCause() instanceof SourceException) { - replicationRunnableFailureRef.set(FailureHelper.sourceFailure(ex, Long.valueOf(jobId), attempt)); - } else if (ex.getCause() instanceof DestinationException) { - replicationRunnableFailureRef.set(FailureHelper.destinationFailure(ex, Long.valueOf(jobId), attempt)); - } else { - replicationRunnableFailureRef.set(FailureHelper.replicationFailure(ex, Long.valueOf(jobId), attempt)); - } - } - }); - - LOGGER.info("Waiting for source and destination threads to complete."); - // CompletableFuture#allOf waits until all futures finish before returning, even if one throws an - // exception. So in order to handle exceptions from a future immediately without needing to wait for - // the other future to finish, we first call CompletableFuture#anyOf. - CompletableFuture.anyOf(replicationThreadFuture, destinationOutputThreadFuture).get(); - LOGGER.info("One of source or destination thread complete. Waiting on the other."); - CompletableFuture.allOf(replicationThreadFuture, destinationOutputThreadFuture).get(); - LOGGER.info("Source and destination threads complete."); - - } catch (final Exception e) { - hasFailed.set(true); - ApmTraceUtils.addExceptionToTrace(e); - LOGGER.error("Sync worker failed.", e); - } finally { - executors.shutdownNow(); - } - - final ReplicationStatus outputStatus; - // First check if the process was cancelled. Cancellation takes precedence over failures. - if (cancelled.get()) { - outputStatus = ReplicationStatus.CANCELLED; - } - // if the process was not cancelled but still failed, then it's an actual failure - else if (hasFailed.get()) { - outputStatus = ReplicationStatus.FAILED; - } else { - outputStatus = ReplicationStatus.COMPLETED; - } - + replicate(jobRoot, destinationConfig, timeTracker, replicationRunnableFailureRef, destinationRunnableFailureRef, sourceConfig); timeTracker.trackReplicationEndTime(); - final SyncStats totalSyncStats = new SyncStats() - .withRecordsEmitted(messageTracker.getTotalRecordsEmitted()) - .withBytesEmitted(messageTracker.getTotalBytesEmitted()) - .withSourceStateMessagesEmitted(messageTracker.getTotalSourceStateMessagesEmitted()) - .withDestinationStateMessagesEmitted(messageTracker.getTotalDestinationStateMessagesEmitted()) - .withMaxSecondsBeforeSourceStateMessageEmitted(messageTracker.getMaxSecondsToReceiveSourceStateMessage()) - .withMeanSecondsBeforeSourceStateMessageEmitted(messageTracker.getMeanSecondsToReceiveSourceStateMessage()) - .withMaxSecondsBetweenStateMessageEmittedandCommitted(messageTracker.getMaxSecondsBetweenStateMessageEmittedAndCommitted().orElse(null)) - .withMeanSecondsBetweenStateMessageEmittedandCommitted(messageTracker.getMeanSecondsBetweenStateMessageEmittedAndCommitted().orElse(null)) - .withReplicationStartTime(timeTracker.getReplicationStartTime()) - .withReplicationEndTime(timeTracker.getReplicationEndTime()) - .withSourceReadStartTime(timeTracker.getSourceReadStartTime()) - .withSourceReadEndTime(timeTracker.getSourceReadEndTime()) - .withDestinationWriteStartTime(timeTracker.getDestinationWriteStartTime()) - .withDestinationWriteEndTime(timeTracker.getDestinationWriteEndTime()); - - if (outputStatus == ReplicationStatus.COMPLETED) { - totalSyncStats.setRecordsCommitted(totalSyncStats.getRecordsEmitted()); - } else if (messageTracker.getTotalRecordsCommitted().isPresent()) { - totalSyncStats.setRecordsCommitted(messageTracker.getTotalRecordsCommitted().get()); - } else { - LOGGER.warn("Could not reliably determine committed record counts, committed record stats will be set to null"); - totalSyncStats.setRecordsCommitted(null); - } - - // assume every stream with stats is in streamToEmittedRecords map - final List streamSyncStats = messageTracker.getStreamToEmittedRecords().keySet().stream().map(stream -> { - final SyncStats syncStats = new SyncStats() - .withRecordsEmitted(messageTracker.getStreamToEmittedRecords().get(stream)) - .withBytesEmitted(messageTracker.getStreamToEmittedBytes().get(stream)) - .withSourceStateMessagesEmitted(null) - .withDestinationStateMessagesEmitted(null); - - if (outputStatus == ReplicationStatus.COMPLETED) { - syncStats.setRecordsCommitted(messageTracker.getStreamToEmittedRecords().get(stream)); - } else if (messageTracker.getStreamToCommittedRecords().isPresent()) { - syncStats.setRecordsCommitted(messageTracker.getStreamToCommittedRecords().get().get(stream)); - } else { - syncStats.setRecordsCommitted(null); - } - return new StreamSyncStats() - .withStreamName(stream) - .withStats(syncStats); - }).collect(Collectors.toList()); - - final ReplicationAttemptSummary summary = new ReplicationAttemptSummary() - .withStatus(outputStatus) - .withRecordsSynced(messageTracker.getTotalRecordsEmitted()) // TODO (parker) remove in favor of totalRecordsEmitted - .withBytesSynced(messageTracker.getTotalBytesEmitted()) // TODO (parker) remove in favor of totalBytesEmitted - .withTotalStats(totalSyncStats) - .withStreamStats(streamSyncStats) - .withStartTime(startTime) - .withEndTime(System.currentTimeMillis()); - - final ReplicationOutput output = new ReplicationOutput() - .withReplicationAttemptSummary(summary) - .withOutputCatalog(destinationConfig.getCatalog()); - - // only .setFailures() if a failure occurred or if there is an AirbyteErrorTraceMessage - final FailureReason sourceFailure = replicationRunnableFailureRef.get(); - final FailureReason destinationFailure = destinationRunnableFailureRef.get(); - final FailureReason traceMessageFailure = messageTracker.errorTraceMessageFailure(Long.valueOf(jobId), attempt); - - final List failures = new ArrayList<>(); - - if (traceMessageFailure != null) { - failures.add(traceMessageFailure); - } - - if (sourceFailure != null) { - failures.add(sourceFailure); - } - if (destinationFailure != null) { - failures.add(destinationFailure); - } - if (!failures.isEmpty()) { - output.setFailures(failures); - } + return getReplicationOutput(syncInput, destinationConfig, replicationRunnableFailureRef, destinationRunnableFailureRef, timeTracker); + } catch (final Exception e) { + ApmTraceUtils.addExceptionToTrace(e); + throw new WorkerException("Sync failed", e); + } - if (messageTracker.getSourceOutputState().isPresent()) { - LOGGER.info("Source output at least one state message"); - } else { - LOGGER.info("Source did not output any state messages"); - } + } - if (messageTracker.getDestinationOutputState().isPresent()) { - LOGGER.info("State capture: Updated state to: {}", messageTracker.getDestinationOutputState()); - final State state = messageTracker.getDestinationOutputState().get(); - output.withState(state); - } else if (syncInput.getState() != null) { - LOGGER.warn("State capture: No new state, falling back on input state: {}", syncInput.getState()); - output.withState(syncInput.getState()); - } else { - LOGGER.warn("State capture: No state retained."); - } + private void replicate(Path jobRoot, + WorkerDestinationConfig destinationConfig, + ThreadedTimeTracker timeTracker, + AtomicReference replicationRunnableFailureRef, + AtomicReference destinationRunnableFailureRef, + WorkerSourceConfig sourceConfig) { + final Map mdc = MDC.getCopyOfContextMap(); + + // note: resources are closed in the opposite order in which they are declared. thus source will be + // closed first (which is what we want). + try (destination; source) { + destination.start(destinationConfig, jobRoot); + timeTracker.trackSourceReadStartTime(); + source.start(sourceConfig, jobRoot); + timeTracker.trackDestinationWriteStartTime(); + + // note: `whenComplete` is used instead of `exceptionally` so that the original exception is still + // thrown + final CompletableFuture readFromDstThread = CompletableFuture.runAsync( + readFromDstRunnable(destination, cancelled, messageTracker, mdc, timeTracker), + executors) + .whenComplete((msg, ex) -> { + if (ex != null) { + if (ex.getCause() instanceof DestinationException) { + destinationRunnableFailureRef.set(FailureHelper.destinationFailure(ex, Long.valueOf(jobId), attempt)); + } else { + destinationRunnableFailureRef.set(FailureHelper.replicationFailure(ex, Long.valueOf(jobId), attempt)); + } + } + }); - if (messageTracker.getUnreliableStateTimingMetrics()) { - metricReporter.trackStateMetricTrackerError(); - } + final CompletableFuture readSrcAndWriteDstThread = CompletableFuture.runAsync( + readFromSrcAndWriteToDstRunnable(source, destination, cancelled, mapper, messageTracker, mdc, recordSchemaValidator, metricReporter, + timeTracker), + executors) + .whenComplete((msg, ex) -> { + if (ex != null) { + if (ex.getCause() instanceof SourceException) { + replicationRunnableFailureRef.set(FailureHelper.sourceFailure(ex, Long.valueOf(jobId), attempt)); + } else if (ex.getCause() instanceof DestinationException) { + replicationRunnableFailureRef.set(FailureHelper.destinationFailure(ex, Long.valueOf(jobId), attempt)); + } else { + replicationRunnableFailureRef.set(FailureHelper.replicationFailure(ex, Long.valueOf(jobId), attempt)); + } + } + }); - final ObjectMapper mapper = new ObjectMapper(); - LOGGER.info("sync summary: {}", mapper.writerWithDefaultPrettyPrinter().writeValueAsString(summary)); - LOGGER.info("failures: {}", mapper.writerWithDefaultPrettyPrinter().writeValueAsString(failures)); + LOGGER.info("Waiting for source and destination threads to complete."); + // CompletableFuture#allOf waits until all futures finish before returning, even if one throws an + // exception. So in order to handle exceptions from a future immediately without needing to wait for + // the other future to finish, we first call CompletableFuture#anyOf. + CompletableFuture.anyOf(readSrcAndWriteDstThread, readFromDstThread).get(); + LOGGER.info("One of source or destination thread complete. Waiting on the other."); + CompletableFuture.allOf(readSrcAndWriteDstThread, readFromDstThread).get(); + LOGGER.info("Source and destination threads complete."); - LineGobbler.endSection("REPLICATION"); - return output; } catch (final Exception e) { + hasFailed.set(true); ApmTraceUtils.addExceptionToTrace(e); - throw new WorkerException("Sync failed", e); + LOGGER.error("Sync worker failed.", e); + } finally { + executors.shutdownNow(); } + } + @SuppressWarnings("PMD.AvoidInstanceofChecksInCatchClause") + private static Runnable readFromDstRunnable(final AirbyteDestination destination, + final AtomicBoolean cancelled, + final MessageTracker messageTracker, + final Map mdc, + final ThreadedTimeTracker timeHolder) { + return () -> { + MDC.setContextMap(mdc); + LOGGER.info("Destination output thread started."); + try { + while (!cancelled.get() && !destination.isFinished()) { + final Optional messageOptional; + try { + messageOptional = destination.attemptRead(); + } catch (final Exception e) { + throw new DestinationException("Destination process read attempt failed", e); + } + if (messageOptional.isPresent()) { + LOGGER.info("State in DefaultReplicationWorker from destination: {}", messageOptional.get()); + messageTracker.acceptFromDestination(messageOptional.get()); + } + } + timeHolder.trackDestinationWriteEndTime(); + if (!cancelled.get() && destination.getExitValue() != 0) { + throw new DestinationException("Destination process exited with non-zero exit code " + destination.getExitValue()); + } + } catch (final Exception e) { + if (!cancelled.get()) { + // Although this thread is closed first, it races with the destination's closure and can attempt one + // final read after the destination is closed before it's terminated. + // This read will fail and throw an exception. Because of this, throw exceptions only if the worker + // was not cancelled. + + if (e instanceof DestinationException) { + // Surface Destination exceptions directly so that they can be classified properly by the worker + throw e; + } else { + throw new RuntimeException(e); + } + } + } + }; } @SuppressWarnings("PMD.AvoidInstanceofChecksInCatchClause") - private static Runnable getReplicationRunnable(final AirbyteSource source, - final AirbyteDestination destination, - final AtomicBoolean cancelled, - final AirbyteMapper mapper, - final MessageTracker messageTracker, - final Map mdc, - final RecordSchemaValidator recordSchemaValidator, - final WorkerMetricReporter metricReporter, - final ThreadedTimeTracker timeHolder) { + private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource source, + final AirbyteDestination destination, + final AtomicBoolean cancelled, + final AirbyteMapper mapper, + final MessageTracker messageTracker, + final Map mdc, + final RecordSchemaValidator recordSchemaValidator, + final WorkerMetricReporter metricReporter, + final ThreadedTimeTracker timeHolder) { return () -> { MDC.setContextMap(mdc); LOGGER.info("Replication thread started."); @@ -427,6 +362,159 @@ private static Runnable getReplicationRunnable(final AirbyteSource source, }; } + private ReplicationOutput getReplicationOutput(StandardSyncInput syncInput, + WorkerDestinationConfig destinationConfig, + AtomicReference replicationRunnableFailureRef, + AtomicReference destinationRunnableFailureRef, + ThreadedTimeTracker timeTracker) + throws JsonProcessingException { + final ReplicationStatus outputStatus; + // First check if the process was cancelled. Cancellation takes precedence over failures. + if (cancelled.get()) { + outputStatus = ReplicationStatus.CANCELLED; + } + // if the process was not cancelled but still failed, then it's an actual failure + else if (hasFailed.get()) { + outputStatus = ReplicationStatus.FAILED; + } else { + outputStatus = ReplicationStatus.COMPLETED; + } + + final SyncStats totalSyncStats = getTotalStats(timeTracker, outputStatus); + final List streamSyncStats = getPerStreamStats(outputStatus); + + final ReplicationAttemptSummary summary = new ReplicationAttemptSummary() + .withStatus(outputStatus) + .withRecordsSynced(messageTracker.getTotalRecordsEmitted()) // TODO (parker) remove in favor of totalRecordsEmitted + .withBytesSynced(messageTracker.getTotalBytesEmitted()) // TODO (parker) remove in favor of totalBytesEmitted + .withTotalStats(totalSyncStats) + .withStreamStats(streamSyncStats) + .withStartTime(timeTracker.getReplicationStartTime()) + .withEndTime(System.currentTimeMillis()); + + final ReplicationOutput output = new ReplicationOutput() + .withReplicationAttemptSummary(summary) + .withOutputCatalog(destinationConfig.getCatalog()); + + final List failures = getFailureReasons(replicationRunnableFailureRef, destinationRunnableFailureRef, + output); + + prepStateForLaterSaving(syncInput, output); + + final ObjectMapper mapper = new ObjectMapper(); + LOGGER.info("sync summary: {}", mapper.writerWithDefaultPrettyPrinter().writeValueAsString(summary)); + LOGGER.info("failures: {}", mapper.writerWithDefaultPrettyPrinter().writeValueAsString(failures)); + LineGobbler.endSection("REPLICATION"); + + return output; + } + + private SyncStats getTotalStats(ThreadedTimeTracker timeTracker, ReplicationStatus outputStatus) { + final SyncStats totalSyncStats = new SyncStats() + .withRecordsEmitted(messageTracker.getTotalRecordsEmitted()) + .withBytesEmitted(messageTracker.getTotalBytesEmitted()) + .withSourceStateMessagesEmitted(messageTracker.getTotalSourceStateMessagesEmitted()) + .withDestinationStateMessagesEmitted(messageTracker.getTotalDestinationStateMessagesEmitted()) + .withMaxSecondsBeforeSourceStateMessageEmitted(messageTracker.getMaxSecondsToReceiveSourceStateMessage()) + .withMeanSecondsBeforeSourceStateMessageEmitted(messageTracker.getMeanSecondsToReceiveSourceStateMessage()) + .withMaxSecondsBetweenStateMessageEmittedandCommitted(messageTracker.getMaxSecondsBetweenStateMessageEmittedAndCommitted().orElse(null)) + .withMeanSecondsBetweenStateMessageEmittedandCommitted(messageTracker.getMeanSecondsBetweenStateMessageEmittedAndCommitted().orElse(null)) + .withReplicationStartTime(timeTracker.getReplicationStartTime()) + .withReplicationEndTime(timeTracker.getReplicationEndTime()) + .withSourceReadStartTime(timeTracker.getSourceReadStartTime()) + .withSourceReadEndTime(timeTracker.getSourceReadEndTime()) + .withDestinationWriteStartTime(timeTracker.getDestinationWriteStartTime()) + .withDestinationWriteEndTime(timeTracker.getDestinationWriteEndTime()); + + if (outputStatus == ReplicationStatus.COMPLETED) { + totalSyncStats.setRecordsCommitted(totalSyncStats.getRecordsEmitted()); + } else if (messageTracker.getTotalRecordsCommitted().isPresent()) { + totalSyncStats.setRecordsCommitted(messageTracker.getTotalRecordsCommitted().get()); + } else { + LOGGER.warn("Could not reliably determine committed record counts, committed record stats will be set to null"); + totalSyncStats.setRecordsCommitted(null); + } + return totalSyncStats; + } + + private List getPerStreamStats(ReplicationStatus outputStatus) { + // assume every stream with stats is in streamToEmittedRecords map + return messageTracker.getStreamToEmittedRecords().keySet().stream().map(stream -> { + final SyncStats syncStats = new SyncStats() + .withRecordsEmitted(messageTracker.getStreamToEmittedRecords().get(stream)) + .withBytesEmitted(messageTracker.getStreamToEmittedBytes().get(stream)) + .withSourceStateMessagesEmitted(null) + .withDestinationStateMessagesEmitted(null); + + if (outputStatus == ReplicationStatus.COMPLETED) { + syncStats.setRecordsCommitted(messageTracker.getStreamToEmittedRecords().get(stream)); + } else if (messageTracker.getStreamToCommittedRecords().isPresent()) { + syncStats.setRecordsCommitted(messageTracker.getStreamToCommittedRecords().get().get(stream)); + } else { + syncStats.setRecordsCommitted(null); + } + return new StreamSyncStats() + .withStreamName(stream) + .withStats(syncStats); + }).collect(Collectors.toList()); + } + + /** + * Extracts state out to the {@link ReplicationOutput} so it can be later saved in the + * PersistStateActivity - State is NOT SAVED here. + * + * @param syncInput + * @param output + */ + private void prepStateForLaterSaving(StandardSyncInput syncInput, ReplicationOutput output) { + if (messageTracker.getSourceOutputState().isPresent()) { + LOGGER.info("Source output at least one state message"); + } else { + LOGGER.info("Source did not output any state messages"); + } + + if (messageTracker.getDestinationOutputState().isPresent()) { + LOGGER.info("State capture: Updated state to: {}", messageTracker.getDestinationOutputState()); + final State state = messageTracker.getDestinationOutputState().get(); + output.withState(state); + } else if (syncInput.getState() != null) { + LOGGER.warn("State capture: No new state, falling back on input state: {}", syncInput.getState()); + output.withState(syncInput.getState()); + } else { + LOGGER.warn("State capture: No state retained."); + } + + if (messageTracker.getUnreliableStateTimingMetrics()) { + metricReporter.trackStateMetricTrackerError(); + } + } + + private List getFailureReasons(AtomicReference replicationRunnableFailureRef, + AtomicReference destinationRunnableFailureRef, + ReplicationOutput output) { + // only .setFailures() if a failure occurred or if there is an AirbyteErrorTraceMessage + final FailureReason sourceFailure = replicationRunnableFailureRef.get(); + final FailureReason destinationFailure = destinationRunnableFailureRef.get(); + final FailureReason traceMessageFailure = messageTracker.errorTraceMessageFailure(Long.valueOf(jobId), attempt); + + final List failures = new ArrayList<>(); + + if (traceMessageFailure != null) { + failures.add(traceMessageFailure); + } + + if (sourceFailure != null) { + failures.add(sourceFailure); + } + if (destinationFailure != null) { + failures.add(destinationFailure); + } + if (!failures.isEmpty()) { + output.setFailures(failures); + } + return failures; + } + private static void validateSchema(final RecordSchemaValidator recordSchemaValidator, final Map, Integer>> validationErrors, final AirbyteMessage message) { @@ -457,50 +545,6 @@ private static void validateSchema(final RecordSchemaValidator recordSchemaValid } } - @SuppressWarnings("PMD.AvoidInstanceofChecksInCatchClause") - private static Runnable getDestinationOutputRunnable(final AirbyteDestination destination, - final AtomicBoolean cancelled, - final MessageTracker messageTracker, - final Map mdc, - final ThreadedTimeTracker timeHolder) { - return () -> { - MDC.setContextMap(mdc); - LOGGER.info("Destination output thread started."); - try { - while (!cancelled.get() && !destination.isFinished()) { - final Optional messageOptional; - try { - messageOptional = destination.attemptRead(); - } catch (final Exception e) { - throw new DestinationException("Destination process read attempt failed", e); - } - if (messageOptional.isPresent()) { - LOGGER.info("State in DefaultReplicationWorker from destination: {}", messageOptional.get()); - messageTracker.acceptFromDestination(messageOptional.get()); - } - } - timeHolder.trackDestinationWriteEndTime(); - if (!cancelled.get() && destination.getExitValue() != 0) { - throw new DestinationException("Destination process exited with non-zero exit code " + destination.getExitValue()); - } - } catch (final Exception e) { - if (!cancelled.get()) { - // Although this thread is closed first, it races with the destination's closure and can attempt one - // final read after the destination is closed before it's terminated. - // This read will fail and throw an exception. Because of this, throw exceptions only if the worker - // was not cancelled. - - if (e instanceof DestinationException) { - // Surface Destination exceptions directly so that they can be classified properly by the worker - throw e; - } else { - throw new RuntimeException(e); - } - } - } - }; - } - @Trace(operationName = WORKER_OPERATION_NAME) @Override public void cancel() {