Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle EventTime msgpack extension to handle nanosecond precision time and add its parameter #18

Merged
merged 6 commits into from
Sep 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 35 additions & 11 deletions lib/logstash/codecs/fluent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
# [source,ruby]
# input {
# tcp {
# codec => fluent
# codec => fluent {
# nanosecond_precision => true
# }
# port => 4000
# }
# }
Expand All @@ -22,15 +24,23 @@
#
# Notes:
#
# * the fluent uses a second-precision time for events, so you will never see
# subsecond precision on events processed by this codec.
# * to handle EventTime msgpack extension, you must specify nanosecond_precision parameter as true.
#
class LogStash::Codecs::Fluent < LogStash::Codecs::Base
require "logstash/codecs/fluent/event_time"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to change this to require_relative "event_time" to make it work on a new Logstash 6.2.2 setup otherwise you get {:exception=>#<LoadError: no such file to load -- logstash/codecs/fluent/event_time>

Copy link
Contributor Author

@cosmo0920 cosmo0920 Feb 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Umm..., in my environment, require_relative "event_time" does not work. :'( require_relative "fluent/event_time" and require "logstash/codecs/fluent/event_time" work. 😖

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My plugin setup procedure is:

$ gem build logstash-codec-fluent.gemspec
$ logstash-plugin install logstash-codec-fluent-*.gem

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I'll do another test tomorrow and I'll let you know.


config_name "fluent"

config :nanosecond_precision, :validate => :boolean, :default => false

def register
require "msgpack"
@decoder = MessagePack::Unpacker.new
@factory = MessagePack::Factory.new
if @nanosecond_precision
@factory.register_type(EventTime::TYPE, EventTime)
end
@packer = @factory.packer
@decoder = @factory.unpacker
end

def decode(data, &block)
Expand All @@ -43,14 +53,19 @@ def encode(event)
# Ensure tag to "tag1.tag2.tag3" style string.
# Fluentd cannot handle Array class value in forward protocol's tag.
tag = forwardable_tag(event)
epochtime = event.timestamp.to_i
epochtime = if @nanosecond_precision
EventTime.new(event.timestamp.to_i, event.timestamp.usec * 1000)
else
event.timestamp.to_i
end

# use normalize to make sure returned Hash is pure Ruby for
# MessagePack#pack which relies on pure Ruby object recognition
data = LogStash::Util.normalize(event.to_hash)
# timestamp is serialized as a iso8601 string
# merge to avoid modifying data which could have side effects if multiple outputs
@on_event.call(event, MessagePack.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)]))
@packer.clear
@on_event.call(event, @packer.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)]))
end # def encode

def forwardable_tag(event)
Expand All @@ -67,6 +82,15 @@ def forwardable_tag(event)

private

def decode_fluent_time(fluent_time)
case fluent_time
when Fixnum
fluent_time
when EventTime
Time.at(fluent_time.sec, fluent_time.nsec)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using the latest version of this plugin and are running into a problem, which in our opinion is caused by this line here. As far as I can see, calling Time.at with just two parameters, expects the second parameter to specify microseconds and not nanoseconds, which in our case changed the timestamps and suddenly added roughly 10 minutes.

Shouldn't this read Time.at(fluent_time.sec, fluent_time.nsec / 1000.0) instead?

end
end

def decode_event(data, &block)
tag = data[0]
entries = data[1]
Expand All @@ -80,9 +104,9 @@ def decode_event(data, &block)
raise(LogStash::Error, "PackedForward with compression is not supported")
end

entries_decoder = MessagePack::Unpacker.new
entries_decoder = @decoder
entries_decoder.feed_each(entries) do |entry|
epochtime = entry[0]
epochtime = decode_fluent_time(entry[0])
map = entry[1]
event = LogStash::Event.new(map.merge(
LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime),
Expand All @@ -93,17 +117,17 @@ def decode_event(data, &block)
when Array
# Forward
entries.each do |entry|
epochtime = entry[0]
epochtime = decode_fluent_time(entry[0])
map = entry[1]
event = LogStash::Event.new(map.merge(
LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime),
"tags" => [ tag ]
))
yield event
end
when Fixnum
when Fixnum, EventTime
# Message
epochtime = entries
epochtime = decode_fluent_time(entries)
map = data[2]
event = LogStash::Event.new(map.merge(
LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime),
Expand Down
28 changes: 28 additions & 0 deletions lib/logstash/codecs/fluent/event_time.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
module LogStash; module Codecs; class Fluent;
class EventTime
attr_reader :sec, :nsec

TYPE = 0

def initialize(sec, nsec = 0)
@sec = sec
@nsec = nsec
end

def to_msgpack(io = nil)
@sec.to_msgpack(io)
end

def to_msgpack_ext
[@sec, @nsec].pack('NN')
end

def self.from_msgpack_ext(data)
new(*data.unpack('NN'))
end

def to_json(*args)
@sec
end
end
end; end; end
65 changes: 54 additions & 11 deletions spec/codecs/fluent_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,16 @@
require_relative "../spec_helper"
require "logstash/plugin"
require "logstash/event"
require "msgpack"

describe LogStash::Codecs::Fluent do
before do
@factory = MessagePack::Factory.new
@factory.register_type(LogStash::Codecs::Fluent::EventTime::TYPE,
LogStash::Codecs::Fluent::EventTime)
@packer = @factory.packer
@unpacker = @factory.unpacker
end

let(:properties) { {:name => "foo" } }
let(:event) { LogStash::Event.new(properties) }
Expand All @@ -17,9 +25,25 @@

it "should encode as message pack format" do
subject.on_event do |event, data|
fields = MessagePack.unpack(data)
expect(fields[0]).to eq("log")
expect(fields[2]["name"]).to eq("foo")
@unpacker.feed_each(data) do |fields|
expect(fields[0]).to eq("log")
expect(fields[2]["name"]).to eq("foo")
end
end
subject.encode(event)
end

end

describe "event encoding with EventTime" do
subject { LogStash::Plugin.lookup("codec", "fluent").new({"nanosecond_precision" => true}) }

it "should encode as message pack format" do
subject.on_event do |event, data|
@unpacker.feed_each(data) do |fields|
expect(fields[0]).to eq("log")
expect(fields[2]["name"]).to eq("foo")
end
end
subject.encode(event)
end
Expand All @@ -32,8 +56,27 @@
let(:epochtime) { event.timestamp.to_i }
let(:data) { LogStash::Util.normalize(event.to_hash) }
let(:message) do
MessagePack.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)])
@packer.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)])
end

it "should decode without errors" do
subject.decode(message) do |event|
expect(event.get("name")).to eq("foo")
end
end

end

describe "event decoding with EventTime" do

let(:tag) { "mytag" }
let(:epochtime) { LogStash::Codecs::Fluent::EventTime.new(event.timestamp.to_i,
event.timestamp.usec * 1000) }
let(:data) { LogStash::Util.normalize(event.to_hash) }
let(:message) do
@packer.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)])
end
subject { LogStash::Plugin.lookup("codec", "fluent").new({"nanosecond_precision" => true}) }

it "should decode without errors" do
subject.decode(message) do |event|
Expand Down Expand Up @@ -87,13 +130,13 @@
let(:epochtime) { event.timestamp.to_i }
let(:data) { LogStash::Util.normalize(event.to_hash) }
let(:message) do
MessagePack.pack([tag,
[
[epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)],
[epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)],
[epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)]
]
])
@packer.pack([tag,
[
[epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)],
[epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)],
[epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)]
]
])
end

it "should decode without errors" do
Expand Down