-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle EventTime msgpack extension to handle nanosecond precision time and add its parameter #18
Changes from all commits
bfa3e2e
ae8f0f4
43ac5f6
c6fdbc8
bf83df6
ced704d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,7 +10,9 @@ | |
# [source,ruby] | ||
# input { | ||
# tcp { | ||
# codec => fluent | ||
# codec => fluent { | ||
# nanosecond_precision => true | ||
# } | ||
# port => 4000 | ||
# } | ||
# } | ||
|
@@ -22,15 +24,23 @@ | |
# | ||
# Notes: | ||
# | ||
# * the fluent uses a second-precision time for events, so you will never see | ||
# subsecond precision on events processed by this codec. | ||
# * to handle EventTime msgpack extension, you must specify nanosecond_precision parameter as true. | ||
# | ||
class LogStash::Codecs::Fluent < LogStash::Codecs::Base | ||
require "logstash/codecs/fluent/event_time" | ||
|
||
config_name "fluent" | ||
|
||
config :nanosecond_precision, :validate => :boolean, :default => false | ||
|
||
def register | ||
require "msgpack" | ||
@decoder = MessagePack::Unpacker.new | ||
@factory = MessagePack::Factory.new | ||
if @nanosecond_precision | ||
@factory.register_type(EventTime::TYPE, EventTime) | ||
end | ||
@packer = @factory.packer | ||
@decoder = @factory.unpacker | ||
end | ||
|
||
def decode(data, &block) | ||
|
@@ -43,14 +53,19 @@ def encode(event) | |
# Ensure tag to "tag1.tag2.tag3" style string. | ||
# Fluentd cannot handle Array class value in forward protocol's tag. | ||
tag = forwardable_tag(event) | ||
epochtime = event.timestamp.to_i | ||
epochtime = if @nanosecond_precision | ||
EventTime.new(event.timestamp.to_i, event.timestamp.usec * 1000) | ||
else | ||
event.timestamp.to_i | ||
end | ||
|
||
# use normalize to make sure returned Hash is pure Ruby for | ||
# MessagePack#pack which relies on pure Ruby object recognition | ||
data = LogStash::Util.normalize(event.to_hash) | ||
# timestamp is serialized as a iso8601 string | ||
# merge to avoid modifying data which could have side effects if multiple outputs | ||
@on_event.call(event, MessagePack.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)])) | ||
@packer.clear | ||
@on_event.call(event, @packer.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)])) | ||
end # def encode | ||
|
||
def forwardable_tag(event) | ||
|
@@ -67,6 +82,15 @@ def forwardable_tag(event) | |
|
||
private | ||
|
||
def decode_fluent_time(fluent_time) | ||
case fluent_time | ||
when Fixnum | ||
fluent_time | ||
when EventTime | ||
Time.at(fluent_time.sec, fluent_time.nsec) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are using the latest version of this plugin and are running into a problem, which in our opinion is caused by this line here. As far as I can see, calling Shouldn't this read |
||
end | ||
end | ||
|
||
def decode_event(data, &block) | ||
tag = data[0] | ||
entries = data[1] | ||
|
@@ -80,9 +104,9 @@ def decode_event(data, &block) | |
raise(LogStash::Error, "PackedForward with compression is not supported") | ||
end | ||
|
||
entries_decoder = MessagePack::Unpacker.new | ||
entries_decoder = @decoder | ||
entries_decoder.feed_each(entries) do |entry| | ||
epochtime = entry[0] | ||
epochtime = decode_fluent_time(entry[0]) | ||
map = entry[1] | ||
event = LogStash::Event.new(map.merge( | ||
LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime), | ||
|
@@ -93,17 +117,17 @@ def decode_event(data, &block) | |
when Array | ||
# Forward | ||
entries.each do |entry| | ||
epochtime = entry[0] | ||
epochtime = decode_fluent_time(entry[0]) | ||
map = entry[1] | ||
event = LogStash::Event.new(map.merge( | ||
LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime), | ||
"tags" => [ tag ] | ||
)) | ||
yield event | ||
end | ||
when Fixnum | ||
when Fixnum, EventTime | ||
# Message | ||
epochtime = entries | ||
epochtime = decode_fluent_time(entries) | ||
map = data[2] | ||
event = LogStash::Event.new(map.merge( | ||
LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime), | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
module LogStash; module Codecs; class Fluent; | ||
class EventTime | ||
attr_reader :sec, :nsec | ||
|
||
TYPE = 0 | ||
|
||
def initialize(sec, nsec = 0) | ||
@sec = sec | ||
@nsec = nsec | ||
end | ||
|
||
def to_msgpack(io = nil) | ||
@sec.to_msgpack(io) | ||
end | ||
|
||
def to_msgpack_ext | ||
[@sec, @nsec].pack('NN') | ||
end | ||
|
||
def self.from_msgpack_ext(data) | ||
new(*data.unpack('NN')) | ||
end | ||
|
||
def to_json(*args) | ||
@sec | ||
end | ||
end | ||
end; end; end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to change this to
require_relative "event_time"
to make it work on a new Logstash 6.2.2 setup otherwise you get{:exception=>#<LoadError: no such file to load -- logstash/codecs/fluent/event_time>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Umm..., in my environment,
require_relative "event_time"
does not work. :'(require_relative "fluent/event_time"
andrequire "logstash/codecs/fluent/event_time"
work. 😖There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My plugin setup procedure is:
$ gem build logstash-codec-fluent.gemspec $ logstash-plugin install logstash-codec-fluent-*.gem
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. I'll do another test tomorrow and I'll let you know.