@@ -44,40 +44,36 @@ public DefaultAirbyteStreamFactory() {
44
44
public Stream <AirbyteMessage > create (final BufferedReader bufferedReader ) {
45
45
return bufferedReader
46
46
.lines ()
47
- .map ( s -> {
48
- final Optional <JsonNode > j = Jsons .tryDeserialize (s );
49
- if (j .isEmpty ()) {
47
+ .flatMap ( line -> {
48
+ final Optional <JsonNode > jsonLine = Jsons .tryDeserialize (line );
49
+ if (jsonLine .isEmpty ()) {
50
50
// we log as info all the lines that are not valid json
51
51
// some sources actually log their process on stdout, we
52
52
// want to make sure this info is available in the logs.
53
- logger .info (s );
53
+ logger .info (line );
54
54
}
55
- return j ;
55
+ return jsonLine . stream () ;
56
56
})
57
- .filter (Optional ::isPresent )
58
- .map (Optional ::get )
59
57
// filter invalid messages
60
- .filter (j -> {
61
- final boolean res = protocolValidator .test (j );
58
+ .filter (jsonLine -> {
59
+ final boolean res = protocolValidator .test (jsonLine );
62
60
if (!res ) {
63
- logger .error ("Validation failed: {}" , Jsons .serialize (j ));
61
+ logger .error ("Validation failed: {}" , Jsons .serialize (jsonLine ));
64
62
}
65
63
return res ;
66
64
})
67
- .map ( j -> {
68
- final Optional <AirbyteMessage > m = Jsons .tryObject (j , AirbyteMessage .class );
65
+ .flatMap ( jsonLine -> {
66
+ final Optional <AirbyteMessage > m = Jsons .tryObject (jsonLine , AirbyteMessage .class );
69
67
if (m .isEmpty ()) {
70
- logger .error ("Deserialization failed: {}" , Jsons .serialize (j ));
68
+ logger .error ("Deserialization failed: {}" , Jsons .serialize (jsonLine ));
71
69
}
72
- return m ;
70
+ return m . stream () ;
73
71
})
74
- .filter (Optional ::isPresent )
75
- .map (Optional ::get )
76
72
// filter logs
77
- .filter (m -> {
78
- final boolean isLog = m .getType () == AirbyteMessage .Type .LOG ;
73
+ .filter (airbyteMessage -> {
74
+ final boolean isLog = airbyteMessage .getType () == AirbyteMessage .Type .LOG ;
79
75
if (isLog ) {
80
- internalLog (m .getLog ());
76
+ internalLog (airbyteMessage .getLog ());
81
77
}
82
78
return !isLog ;
83
79
});
0 commit comments