Skip to content

Commit 99f9c00

Browse files
cosmo0920jsvd
authored andcommitted
Handle EventTime msgpack extension to handle nanosecond precision time and add its parameter (#18)
* Add EventTime class implementation * Implement EventTime handling on decode/encode * Handle EventTime on decode phase correctly * Add documentation for nanosecond precision time built-by EventTime extension * Multiply * 1000 to make correct nanosecond digit * Split EventTime class into fluent/event_time.rb
1 parent b452bf9 commit 99f9c00

File tree

3 files changed

+117
-22
lines changed

3 files changed

+117
-22
lines changed

lib/logstash/codecs/fluent.rb

+35-11
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010
# [source,ruby]
1111
# input {
1212
# tcp {
13-
# codec => fluent
13+
# codec => fluent {
14+
# nanosecond_precision => true
15+
# }
1416
# port => 4000
1517
# }
1618
# }
@@ -22,15 +24,23 @@
2224
#
2325
# Notes:
2426
#
25-
# * the fluent uses a second-precision time for events, so you will never see
26-
# subsecond precision on events processed by this codec.
27+
# * to handle EventTime msgpack extension, you must specify nanosecond_precision parameter as true.
2728
#
2829
class LogStash::Codecs::Fluent < LogStash::Codecs::Base
30+
require "logstash/codecs/fluent/event_time"
31+
2932
config_name "fluent"
3033

34+
config :nanosecond_precision, :validate => :boolean, :default => false
35+
3136
def register
3237
require "msgpack"
33-
@decoder = MessagePack::Unpacker.new
38+
@factory = MessagePack::Factory.new
39+
if @nanosecond_precision
40+
@factory.register_type(EventTime::TYPE, EventTime)
41+
end
42+
@packer = @factory.packer
43+
@decoder = @factory.unpacker
3444
end
3545

3646
def decode(data, &block)
@@ -43,14 +53,19 @@ def encode(event)
4353
# Ensure tag to "tag1.tag2.tag3" style string.
4454
# Fluentd cannot handle Array class value in forward protocol's tag.
4555
tag = forwardable_tag(event)
46-
epochtime = event.timestamp.to_i
56+
epochtime = if @nanosecond_precision
57+
EventTime.new(event.timestamp.to_i, event.timestamp.usec * 1000)
58+
else
59+
event.timestamp.to_i
60+
end
4761

4862
# use normalize to make sure returned Hash is pure Ruby for
4963
# MessagePack#pack which relies on pure Ruby object recognition
5064
data = LogStash::Util.normalize(event.to_hash)
5165
# timestamp is serialized as a iso8601 string
5266
# merge to avoid modifying data which could have side effects if multiple outputs
53-
@on_event.call(event, MessagePack.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)]))
67+
@packer.clear
68+
@on_event.call(event, @packer.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)]))
5469
end # def encode
5570

5671
def forwardable_tag(event)
@@ -67,6 +82,15 @@ def forwardable_tag(event)
6782

6883
private
6984

85+
def decode_fluent_time(fluent_time)
86+
case fluent_time
87+
when Fixnum
88+
fluent_time
89+
when EventTime
90+
Time.at(fluent_time.sec, fluent_time.nsec)
91+
end
92+
end
93+
7094
def decode_event(data, &block)
7195
tag = data[0]
7296
entries = data[1]
@@ -80,9 +104,9 @@ def decode_event(data, &block)
80104
raise(LogStash::Error, "PackedForward with compression is not supported")
81105
end
82106

83-
entries_decoder = MessagePack::Unpacker.new
107+
entries_decoder = @decoder
84108
entries_decoder.feed_each(entries) do |entry|
85-
epochtime = entry[0]
109+
epochtime = decode_fluent_time(entry[0])
86110
map = entry[1]
87111
event = LogStash::Event.new(map.merge(
88112
LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime),
@@ -93,17 +117,17 @@ def decode_event(data, &block)
93117
when Array
94118
# Forward
95119
entries.each do |entry|
96-
epochtime = entry[0]
120+
epochtime = decode_fluent_time(entry[0])
97121
map = entry[1]
98122
event = LogStash::Event.new(map.merge(
99123
LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime),
100124
"tags" => [ tag ]
101125
))
102126
yield event
103127
end
104-
when Fixnum
128+
when Fixnum, EventTime
105129
# Message
106-
epochtime = entries
130+
epochtime = decode_fluent_time(entries)
107131
map = data[2]
108132
event = LogStash::Event.new(map.merge(
109133
LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime),
+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
module LogStash; module Codecs; class Fluent;
2+
class EventTime
3+
attr_reader :sec, :nsec
4+
5+
TYPE = 0
6+
7+
def initialize(sec, nsec = 0)
8+
@sec = sec
9+
@nsec = nsec
10+
end
11+
12+
def to_msgpack(io = nil)
13+
@sec.to_msgpack(io)
14+
end
15+
16+
def to_msgpack_ext
17+
[@sec, @nsec].pack('NN')
18+
end
19+
20+
def self.from_msgpack_ext(data)
21+
new(*data.unpack('NN'))
22+
end
23+
24+
def to_json(*args)
25+
@sec
26+
end
27+
end
28+
end; end; end

spec/codecs/fluent_spec.rb

+54-11
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,16 @@
22
require_relative "../spec_helper"
33
require "logstash/plugin"
44
require "logstash/event"
5+
require "msgpack"
56

67
describe LogStash::Codecs::Fluent do
8+
before do
9+
@factory = MessagePack::Factory.new
10+
@factory.register_type(LogStash::Codecs::Fluent::EventTime::TYPE,
11+
LogStash::Codecs::Fluent::EventTime)
12+
@packer = @factory.packer
13+
@unpacker = @factory.unpacker
14+
end
715

816
let(:properties) { {:name => "foo" } }
917
let(:event) { LogStash::Event.new(properties) }
@@ -17,9 +25,25 @@
1725

1826
it "should encode as message pack format" do
1927
subject.on_event do |event, data|
20-
fields = MessagePack.unpack(data)
21-
expect(fields[0]).to eq("log")
22-
expect(fields[2]["name"]).to eq("foo")
28+
@unpacker.feed_each(data) do |fields|
29+
expect(fields[0]).to eq("log")
30+
expect(fields[2]["name"]).to eq("foo")
31+
end
32+
end
33+
subject.encode(event)
34+
end
35+
36+
end
37+
38+
describe "event encoding with EventTime" do
39+
subject { LogStash::Plugin.lookup("codec", "fluent").new({"nanosecond_precision" => true}) }
40+
41+
it "should encode as message pack format" do
42+
subject.on_event do |event, data|
43+
@unpacker.feed_each(data) do |fields|
44+
expect(fields[0]).to eq("log")
45+
expect(fields[2]["name"]).to eq("foo")
46+
end
2347
end
2448
subject.encode(event)
2549
end
@@ -32,8 +56,27 @@
3256
let(:epochtime) { event.timestamp.to_i }
3357
let(:data) { LogStash::Util.normalize(event.to_hash) }
3458
let(:message) do
35-
MessagePack.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)])
59+
@packer.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)])
60+
end
61+
62+
it "should decode without errors" do
63+
subject.decode(message) do |event|
64+
expect(event.get("name")).to eq("foo")
65+
end
66+
end
67+
68+
end
69+
70+
describe "event decoding with EventTime" do
71+
72+
let(:tag) { "mytag" }
73+
let(:epochtime) { LogStash::Codecs::Fluent::EventTime.new(event.timestamp.to_i,
74+
event.timestamp.usec * 1000) }
75+
let(:data) { LogStash::Util.normalize(event.to_hash) }
76+
let(:message) do
77+
@packer.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)])
3678
end
79+
subject { LogStash::Plugin.lookup("codec", "fluent").new({"nanosecond_precision" => true}) }
3780

3881
it "should decode without errors" do
3982
subject.decode(message) do |event|
@@ -87,13 +130,13 @@
87130
let(:epochtime) { event.timestamp.to_i }
88131
let(:data) { LogStash::Util.normalize(event.to_hash) }
89132
let(:message) do
90-
MessagePack.pack([tag,
91-
[
92-
[epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)],
93-
[epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)],
94-
[epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)]
95-
]
96-
])
133+
@packer.pack([tag,
134+
[
135+
[epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)],
136+
[epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)],
137+
[epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)]
138+
]
139+
])
97140
end
98141

99142
it "should decode without errors" do

0 commit comments

Comments
 (0)