Skip to content

Commit 15f7e0b

Browse files
authored
Feat: added target configuration + event-factory support (#27)
+ Refactor: avoid Fixnum warnings + Refactor: review used .rb features + Fix: decoding of time's nano-second precision
1 parent 992c7ac commit 15f7e0b

File tree

5 files changed

+152
-60
lines changed

5 files changed

+152
-60
lines changed

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 3.4.0
2+
- Feat: added target configuration + event-factory support [#27](https://github.com/logstash-plugins/logstash-codec-fluent/pull/27)
3+
- Fix: decoding of time's nano-second precision
4+
15
## 3.3.0
26
- Handle EventTime msgpack extension to handle nanosecond precision time and add its parameter [#18](https://github.com/logstash-plugins/logstash-codec-fluent/pull/18)
37

docs/index.asciidoc

+43-3
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,48 @@ And from your ruby code in your own application:
3636
logger = Fluent::Logger::FluentLogger.new(nil, :host => "example.log", :port => 4000)
3737
logger.post("some_tag", { "your" => "data", "here" => "yay!" })
3838

39-
Notes:
4039

41-
* the fluent uses a second-precision time for events, so you will never see
42-
subsecond precision on events processed by this codec.
40+
NOTE: Fluent uses second-precision for events, so you will not see sub-second precision
41+
on events processed by this codec.
42+
43+
44+
[id="plugins-{type}s-{plugin}-options"]
45+
==== Fluent Codec configuration options
46+
47+
[cols="<,<,<",options="header",]
48+
|=======================================================================
49+
|Setting |Input type|Required
50+
| <<plugins-{type}s-{plugin}-nanosecond_precision>> |<<boolean,boolean>>|No
51+
| <<plugins-{type}s-{plugin}-target>> |<<string,string>>|No
52+
|=======================================================================
53+
54+
&nbsp;
55+
56+
[id="plugins-{type}s-{plugin}-nanosecond_precision"]
57+
===== `nanosecond_precision`
58+
59+
* Value type is <<boolean,boolean>>
60+
* Default value is `false`
61+
62+
Enables sub-second level precision while encoding events.
63+
64+
[id="plugins-{type}s-{plugin}-target"]
65+
===== `target`
66+
67+
* Value type is <<string,string>>
68+
* There is no default value for this setting.
69+
70+
Define the target field for placing the decoded values. If this setting is not
71+
set, data will be stored at the root (top level) of the event.
72+
73+
For example, if you want data to be put under the `logs` field:
74+
[source,ruby]
75+
input {
76+
tcp {
77+
codec => fluent {
78+
target => "[logs]"
79+
}
80+
port => 4000
81+
}
82+
}
4383

lib/logstash/codecs/fluent.rb

+29-26
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
# encoding: utf-8
22
require "logstash/codecs/base"
3-
require "logstash/util/charset"
3+
require "logstash/event"
44
require "logstash/timestamp"
55
require "logstash/util"
66

7+
require 'logstash/plugin_mixins/event_support/event_factory_adapter'
8+
require 'logstash/plugin_mixins/validator_support/field_reference_validation_adapter'
9+
710
# This codec handles fluentd's msgpack schema.
811
#
912
# For example, you can receive logs from `fluent-logger-ruby` with:
@@ -29,10 +32,20 @@
2932
class LogStash::Codecs::Fluent < LogStash::Codecs::Base
3033
require "logstash/codecs/fluent/event_time"
3134

35+
extend LogStash::PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter
36+
37+
include LogStash::PluginMixins::EventSupport::EventFactoryAdapter
38+
3239
config_name "fluent"
3340

3441
config :nanosecond_precision, :validate => :boolean, :default => false
3542

43+
# Defines a target field for placing decoded fields.
44+
# If this setting is omitted, data gets stored at the root (top level) of the event.
45+
#
46+
# NOTE: the target is only relevant while decoding data into a new event.
47+
config :target, :validate => :field_reference
48+
3649
def register
3750
require "msgpack"
3851
@factory = MessagePack::Factory.new
@@ -84,10 +97,10 @@ def forwardable_tag(event)
8497

8598
def decode_fluent_time(fluent_time)
8699
case fluent_time
87-
when Fixnum
100+
when Integer
88101
fluent_time
89102
when EventTime
90-
Time.at(fluent_time.sec, fluent_time.nsec)
103+
Time.at(fluent_time.sec, fluent_time.nsec / 1000.0)
91104
end
92105
end
93106

@@ -106,40 +119,30 @@ def decode_event(data, &block)
106119

107120
entries_decoder = @decoder
108121
entries_decoder.feed_each(entries) do |entry|
109-
epochtime = decode_fluent_time(entry[0])
110-
map = entry[1]
111-
event = LogStash::Event.new(map.merge(
112-
LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime),
113-
"tags" => [ tag ]
114-
))
115-
yield event
122+
yield generate_event(entry[1], entry[0], tag)
116123
end
117124
when Array
118125
# Forward
119126
entries.each do |entry|
120-
epochtime = decode_fluent_time(entry[0])
121-
map = entry[1]
122-
event = LogStash::Event.new(map.merge(
123-
LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime),
124-
"tags" => [ tag ]
125-
))
126-
yield event
127+
yield generate_event(entry[1], entry[0], tag)
127128
end
128-
when Fixnum, EventTime
129+
when Integer, EventTime
129130
# Message
130-
epochtime = decode_fluent_time(entries)
131-
map = data[2]
132-
event = LogStash::Event.new(map.merge(
133-
LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime),
134-
"tags" => [ tag ]
135-
))
136-
yield event
131+
yield generate_event(data[2], entries, tag)
137132
else
138133
raise(LogStash::Error, "Unknown event type")
139134
end
140135
rescue StandardError => e
141136
@logger.error("Fluent parse error, original data now in message field", :error => e, :data => data)
142-
yield LogStash::Event.new("message" => data, "tags" => [ "_fluentparsefailure" ])
137+
yield event_factory.new_event("message" => data, "tags" => [ "_fluentparsefailure" ])
138+
end
139+
140+
def generate_event(map, fluent_time, tag)
141+
epoch_time = decode_fluent_time(fluent_time)
142+
event = targeted_event_factory.new_event(map)
143+
event.set(LogStash::Event::TIMESTAMP, LogStash::Timestamp.at(epoch_time))
144+
event.tag(tag)
145+
event
143146
end
144147

145148
end # class LogStash::Codecs::Fluent

logstash-codec-fluent.gemspec

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-codec-fluent'
4-
s.version = '3.3.0'
4+
s.version = '3.4.0'
55
s.licenses = ['Apache License (2.0)']
66
s.summary = "Reads the `fluentd` `msgpack` schema"
77
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
@@ -21,6 +21,8 @@ Gem::Specification.new do |s|
2121

2222
# Gem dependencies
2323
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
24+
s.add_runtime_dependency 'logstash-mixin-event_support', '~> 1.0'
25+
s.add_runtime_dependency 'logstash-mixin-validator_support', '~> 1.0'
2426

2527
s.platform = 'java'
2628

spec/codecs/fluent_spec.rb

+73-30
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,20 @@
1313
@unpacker = @factory.unpacker
1414
end
1515

16+
subject { LogStash::Codecs::Fluent.new(config) }
17+
18+
let(:config) { Hash.new }
19+
1620
let(:properties) { {:name => "foo" } }
1721
let(:event) { LogStash::Event.new(properties) }
22+
let(:timestamp) { event.timestamp }
23+
let(:epochtime) { timestamp.to_i }
24+
let(:tag) { "mytag" }
25+
let(:data) { { 'name' => 'foo', 'number' => 42 } }
26+
27+
let(:message) do
28+
@packer.pack([tag, epochtime, data])
29+
end
1830

1931
it "should register without errors" do
2032
plugin = LogStash::Plugin.lookup("codec", "fluent").new
@@ -36,7 +48,8 @@
3648
end
3749

3850
describe "event encoding with EventTime" do
39-
subject { LogStash::Plugin.lookup("codec", "fluent").new({"nanosecond_precision" => true}) }
51+
52+
let(:config) { super().merge "nanosecond_precision" => true }
4053

4154
it "should encode as message pack format" do
4255
subject.on_event do |event, data|
@@ -52,48 +65,88 @@
5265

5366
describe "event decoding" do
5467

55-
let(:tag) { "mytag" }
56-
let(:epochtime) { event.timestamp.to_i }
57-
let(:data) { LogStash::Util.normalize(event.to_hash) }
5868
let(:message) do
59-
@packer.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)])
69+
@packer.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => timestamp.to_iso8601)])
6070
end
6171

6272
it "should decode without errors" do
73+
decoded = false
6374
subject.decode(message) do |event|
6475
expect(event.get("name")).to eq("foo")
76+
decoded = true
77+
end
78+
expect(decoded).to be true
79+
end
80+
81+
it "should tag event" do
82+
subject.decode(message) do |event|
83+
expect(event.get("tags")).to eql [ tag ]
6584
end
6685
end
6786

6887
end
6988

7089
describe "event decoding with EventTime" do
7190

72-
let(:tag) { "mytag" }
73-
let(:epochtime) { LogStash::Codecs::Fluent::EventTime.new(event.timestamp.to_i,
74-
event.timestamp.usec * 1000) }
75-
let(:data) { LogStash::Util.normalize(event.to_hash) }
76-
let(:message) do
77-
@packer.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)])
78-
end
91+
let(:epochtime) { LogStash::Codecs::Fluent::EventTime.new(timestamp.to_i, (timestamp.usec * 1000) + 123) }
92+
7993
subject { LogStash::Plugin.lookup("codec", "fluent").new({"nanosecond_precision" => true}) }
8094

8195
it "should decode without errors" do
96+
decoded = false
8297
subject.decode(message) do |event|
8398
expect(event.get("name")).to eq("foo")
99+
decoded = true
100+
end
101+
expect(decoded).to be true
102+
end
103+
104+
it "decodes timestamp with nanos" do
105+
subject.decode(message) do |event|
106+
expect(event.timestamp.to_i).to eql epochtime.sec
107+
expect(event.timestamp.usec * 1000 + 123).to eql epochtime.nsec
108+
end
109+
end
110+
111+
end
112+
113+
describe "event decoding with target" do
114+
115+
let(:tag) { "a_tag" }
116+
let(:epochtime) { 123 }
117+
let(:data) { LogStash::Util.normalize('name' => 'foo') }
118+
119+
let(:config) { super().merge "target" => '[bar]' }
120+
121+
it "should decode without errors" do
122+
decoded = false
123+
subject.decode(message) do |event|
124+
expect(event.include?("name")).to be false
125+
expect(event.get("bar")).to eql('name' => "foo")
126+
decoded = true
127+
end
128+
expect(decoded).to be true
129+
end
130+
131+
it "should tag event" do
132+
subject.decode(message) do |event|
133+
expect(event.get("tags")).to eql [ 'a_tag' ]
134+
end
135+
end
136+
137+
it "should set timestamp" do
138+
subject.decode(message) do |event|
139+
expect(event.timestamp.to_i).to eql(epochtime)
84140
end
85141
end
86142

87143
end
88144

89145
describe "forward protocol tag" do
90-
let(:event) { LogStash::Event.new(properties) }
91-
subject { LogStash::Plugin.lookup("codec", "fluent").new }
92146

93147
describe "when passing Array value" do
94148
let(:properties) { {:tags => ["test", "logstash"], :name => "foo" } }
95149

96-
97150
it "should be joined with '.'" do
98151
subject.forwardable_tag(event) do |tag|
99152
expect(tag).to eq("test.logstash")
@@ -104,7 +157,6 @@
104157
describe "when passing String value" do
105158
let(:properties) { {:tags => "test.logstash", :name => "foo" } }
106159

107-
108160
it "should be pass-through" do
109161
subject.forwardable_tag(event) do |tag|
110162
expect(tag).to eq("test.logstash")
@@ -126,15 +178,13 @@
126178

127179
describe "event decoding (buckets of events)" do
128180

129-
let(:tag) { "mytag" }
130-
let(:epochtime) { event.timestamp.to_i }
131-
let(:data) { LogStash::Util.normalize(event.to_hash) }
181+
let(:data) { LogStash::Util.normalize(event.to_hash) }
132182
let(:message) do
133183
@packer.pack([tag,
134184
[
135-
[epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)],
136-
[epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)],
137-
[epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)]
185+
[epochtime, data.merge(LogStash::Event::TIMESTAMP => timestamp.to_iso8601)],
186+
[epochtime, data.merge(LogStash::Event::TIMESTAMP => timestamp.to_iso8601)],
187+
[epochtime, data.merge(LogStash::Event::TIMESTAMP => timestamp.to_iso8601)]
138188
]
139189
])
140190
end
@@ -154,14 +204,7 @@
154204

155205
describe "event decoding (broken package)" do
156206

157-
let(:tag) { "mytag" }
158-
let(:epochtime) { event.timestamp.to_s }
159-
let(:data) { LogStash::Util.normalize(event.to_hash) }
160-
let(:message) do
161-
MessagePack.pack([tag,
162-
epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)
163-
])
164-
end
207+
let(:epochtime) { timestamp.to_s }
165208

166209
it "should decode with errors" do
167210
subject.decode(message) do |event|

0 commit comments

Comments
 (0)