File tree Expand file tree Collapse file tree 2 files changed +15
-0
lines changed
airbyte-commons-worker/src/main/java/io/airbyte/workers/internal Expand file tree Collapse file tree 2 files changed +15
-0
lines changed Original file line number Diff line number Diff line change 8
8
import io .airbyte .commons .protocol .AirbyteMessageVersionedMigratorFactory ;
9
9
import io .airbyte .commons .version .Version ;
10
10
import java .io .BufferedWriter ;
11
+ import org .slf4j .Logger ;
12
+ import org .slf4j .LoggerFactory ;
11
13
12
14
public class VersionedAirbyteMessageBufferedWriterFactory implements AirbyteMessageBufferedWriterFactory {
13
15
16
+ private static final Logger LOGGER = LoggerFactory .getLogger (VersionedAirbyteMessageBufferedWriterFactory .class );
17
+
14
18
private final AirbyteMessageSerDeProvider serDeProvider ;
15
19
private final AirbyteMessageVersionedMigratorFactory migratorFactory ;
16
20
private final Version protocolVersion ;
@@ -25,6 +29,11 @@ public VersionedAirbyteMessageBufferedWriterFactory(final AirbyteMessageSerDePro
25
29
26
30
@ Override
27
31
public AirbyteMessageBufferedWriter createWriter (BufferedWriter bufferedWriter ) {
32
+ final boolean needMigration = !protocolVersion .getMajorVersion ().equals (migratorFactory .getMostRecentVersion ().getMajorVersion ());
33
+ LOGGER .info (
34
+ "Writing messages to protocol version {}{}" ,
35
+ protocolVersion .serialize (),
36
+ needMigration ? ", messages will be downgraded from protocol version " + migratorFactory .getMostRecentVersion ().serialize () : "" );
28
37
return new VersionedAirbyteMessageBufferedWriter <>(
29
38
bufferedWriter ,
30
39
serDeProvider .getSerializer (protocolVersion ).orElseThrow (),
Original file line number Diff line number Diff line change @@ -92,6 +92,12 @@ public Stream<AirbyteMessage> create(final BufferedReader bufferedReader) {
92
92
initializeForProtocolVersion (fallbackVersion );
93
93
}
94
94
}
95
+
96
+ final boolean needMigration = !protocolVersion .getMajorVersion ().equals (migratorFactory .getMostRecentVersion ().getMajorVersion ());
97
+ logger .info (
98
+ "Reading messages from protocol version {}{}" ,
99
+ protocolVersion .serialize (),
100
+ needMigration ? ", messages will be upgraded to protocol version " + migratorFactory .getMostRecentVersion ().serialize () : "" );
95
101
return super .create (bufferedReader );
96
102
}
97
103
You can’t perform that action at this time.
0 commit comments