Description
I am sending binary data (Happens to be Protobuf messages) to Logstash, and the current implementation of the codec fails to generate messages with the appropriate payload.
I'm using a filter downstream to decode the Protobuf, so all we need is a sane binary representation of the binary data.
Given a config of
input {
tcp {
codec => fluent
port => 24224
}
}
output {
stdout {
codec => "rubydebug"
}
}
and sending a binary message, we get the error:
[2019-02-15T10:33:39,004][ERROR][logstash.codecs.fluent ] Fluent parse error, original data now in message field {:error=>#<NoMethodError: undefined method `merge' for #<String:0x4698b154>>, :data=>["N321GG_e2p", [[1550160094, "\n\x00\"\x9B\x01\n\a\n\x05%_v\xB4SZ\x8F\x01\n\x8C\x01\b\xFFD\x10\x01\x1A\t\tE#\x01\"\x14\x00\x00\x00\"\x03uid*\x05\r\x01\x00\x00\x7F2\x04iptv:\x05CNN_1R\x05-\x00\x00\x00\x00Z\x05-\x00\x00\x00\x00b\x02\b\x00j\a\n\x05%_v\xB4Sr\x05-\x00\x00\x00\x00z\x02ua\x82\x01\x04hash\x8A\x01$142f168d-374b-4f7e-8097-beeff02d4571\x92\x01\x06seatId\x9A\x01\x03ped"]]]}
Presumably you can replicate this by sending non UTF-8 data over Fluentd.
It appears the error occurs from an incomplete handling of the array type in the codec. Ruby looks for a string (the payload) to act as a map, and when that duck meows, the codec fails.
I am able to hack things into working with the following change to fluent.rb
, lines 98-102:
93 when Array
94 # Forward
95 entries.each do |entry|
96 epochtime = entry[0]
97 map = entry[1]
98 if !map.respond_to? :merge
99 map = {
100 "message" => entry[1]
101 }
102 end
103 event = LogStash::Event.new(map.merge(
104 LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime),
105 "tags" => [ tag ]
106 ))
107 yield event
108 end
I'm not much of a Ruby dev, so I'm looking for the best way to implement binary ingest, hopefully in a way that gets merged upstream.