Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: added target configuration + event-factory support #27

Merged
merged 14 commits into from
Aug 9, 2021
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 3.4.0
- Feat: added target configuration + event-factory support [#27](https://github.com/logstash-plugins/logstash-codec-fluent/pull/27)
- Fix: decoding of time's nano-second precision

## 3.3.0
- Handle EventTime msgpack extension to handle nanosecond precision time and add its parameter [#18](https://github.com/logstash-plugins/logstash-codec-fluent/pull/18)

Expand Down
41 changes: 41 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,44 @@ Notes:
* the fluent uses a second-precision time for events, so you will never see
subsecond precision on events processed by this codec.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that this was pre-existing, but we can improve it by using this instead:

NOTE: The fluent uses a second-precision time for events, so you will never see subsecond precision on events processed by this codec.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it seems like a word is missing. "NOTE: The fluent ________ uses a second-precision time..." Should that be the fluent schema? The fluent codec?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's missing something than it would be the msgpack schema but it's a fluent specific use here ... would this work?

Suggested change
subsecond precision on events processed by this codec.
NOTE: Fluent uses second-precision for events, so you will not see sub-second precision on events processed by this codec.

(honestly not even sure if that statement is true anymore)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your call. We should move forward to get your other important changes in. Perhaps we open an issue for tracking the validity of the statement?

I've approved, and I trust your judgement on how to proceed. Lemme know if you'd like for me to open an issue.



[id="plugins-{type}s-{plugin}-options"]
==== Fluent Codec configuration options

[cols="<,<,<",options="header",]
|=======================================================================
|Setting |Input type|Required
| <<plugins-{type}s-{plugin}-nanosecond_precision>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-target>> |<<string,string>>|No
|=======================================================================

&nbsp;

[id="plugins-{type}s-{plugin}-nanosecond_precision"]
===== `nanosecond_precision`

* Value type is <<boolean,boolean>>
* Default value is `false`

Enables sub-second level precision while encoding events.

[id="plugins-{type}s-{plugin}-target"]
===== `target`

* Value type is <<string,string>>
* There is no default value for this setting.

Define the target field for placing the decoded values. If this setting is not
set, data will be stored at the root (top level) of the event.

For example, if you want data to be put under the `logs` field:
[source,ruby]
input {
tcp {
codec => fluent {
target => "[logs]"
}
port => 4000
}
}

55 changes: 29 additions & 26 deletions lib/logstash/codecs/fluent.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
# encoding: utf-8
require "logstash/codecs/base"
require "logstash/util/charset"
require "logstash/event"
require "logstash/timestamp"
require "logstash/util"

require 'logstash/plugin_mixins/event_support/event_factory_adapter'
require 'logstash/plugin_mixins/validator_support/field_reference_validation_adapter'

# This codec handles fluentd's msgpack schema.
#
# For example, you can receive logs from `fluent-logger-ruby` with:
Expand All @@ -29,10 +32,20 @@
class LogStash::Codecs::Fluent < LogStash::Codecs::Base
require "logstash/codecs/fluent/event_time"

extend LogStash::PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter

include LogStash::PluginMixins::EventSupport::EventFactoryAdapter

config_name "fluent"

config :nanosecond_precision, :validate => :boolean, :default => false

# Defines a target field for placing decoded fields.
# If this setting is omitted, data gets stored at the root (top level) of the event.
#
# NOTE: the target is only relevant while decoding data into a new event.
config :target, :validate => :field_reference

def register
require "msgpack"
@factory = MessagePack::Factory.new
Expand Down Expand Up @@ -84,10 +97,10 @@ def forwardable_tag(event)

def decode_fluent_time(fluent_time)
case fluent_time
when Fixnum
when Integer
fluent_time
when EventTime
Time.at(fluent_time.sec, fluent_time.nsec)
Time.at(fluent_time.sec, fluent_time.nsec / 1000.0)
end
end

Expand All @@ -106,40 +119,30 @@ def decode_event(data, &block)

entries_decoder = @decoder
entries_decoder.feed_each(entries) do |entry|
epochtime = decode_fluent_time(entry[0])
map = entry[1]
event = LogStash::Event.new(map.merge(
LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime),
"tags" => [ tag ]
))
yield event
yield generate_event(entry[1], entry[0], tag)
end
when Array
# Forward
entries.each do |entry|
epochtime = decode_fluent_time(entry[0])
map = entry[1]
event = LogStash::Event.new(map.merge(
LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime),
"tags" => [ tag ]
))
yield event
yield generate_event(entry[1], entry[0], tag)
end
when Fixnum, EventTime
when Integer, EventTime
# Message
epochtime = decode_fluent_time(entries)
map = data[2]
event = LogStash::Event.new(map.merge(
LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime),
"tags" => [ tag ]
))
yield event
yield generate_event(data[2], entries, tag)
else
raise(LogStash::Error, "Unknown event type")
end
rescue StandardError => e
@logger.error("Fluent parse error, original data now in message field", :error => e, :data => data)
yield LogStash::Event.new("message" => data, "tags" => [ "_fluentparsefailure" ])
yield event_factory.new_event("message" => data, "tags" => [ "_fluentparsefailure" ])
end

def generate_event(map, fluent_time, tag)
epoch_time = decode_fluent_time(fluent_time)
event = targeted_event_factory.new_event(map)
event.set(LogStash::Event::TIMESTAMP, LogStash::Timestamp.at(epoch_time))
event.tag(tag)
event
end

end # class LogStash::Codecs::Fluent
4 changes: 3 additions & 1 deletion logstash-codec-fluent.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-codec-fluent'
s.version = '3.3.0'
s.version = '3.4.0'
s.licenses = ['Apache License (2.0)']
s.summary = "Reads the `fluentd` `msgpack` schema"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand All @@ -21,6 +21,8 @@ Gem::Specification.new do |s|

# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency 'logstash-mixin-event_support', '~> 1.0'
s.add_runtime_dependency 'logstash-mixin-validator_support', '~> 1.0'

s.platform = 'java'

Expand Down
103 changes: 73 additions & 30 deletions spec/codecs/fluent_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,20 @@
@unpacker = @factory.unpacker
end

subject { LogStash::Codecs::Fluent.new(config) }

let(:config) { Hash.new }

let(:properties) { {:name => "foo" } }
let(:event) { LogStash::Event.new(properties) }
let(:timestamp) { event.timestamp }
let(:epochtime) { timestamp.to_i }
let(:tag) { "mytag" }
let(:data) { { 'name' => 'foo', 'number' => 42 } }

let(:message) do
@packer.pack([tag, epochtime, data])
end

it "should register without errors" do
plugin = LogStash::Plugin.lookup("codec", "fluent").new
Expand All @@ -36,7 +48,8 @@
end

describe "event encoding with EventTime" do
subject { LogStash::Plugin.lookup("codec", "fluent").new({"nanosecond_precision" => true}) }

let(:config) { super().merge "nanosecond_precision" => true }

it "should encode as message pack format" do
subject.on_event do |event, data|
Expand All @@ -52,48 +65,88 @@

describe "event decoding" do

let(:tag) { "mytag" }
let(:epochtime) { event.timestamp.to_i }
let(:data) { LogStash::Util.normalize(event.to_hash) }
let(:message) do
@packer.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)])
@packer.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => timestamp.to_iso8601)])
end

it "should decode without errors" do
decoded = false
subject.decode(message) do |event|
expect(event.get("name")).to eq("foo")
decoded = true
end
expect(decoded).to be true
end

it "should tag event" do
subject.decode(message) do |event|
expect(event.get("tags")).to eql [ tag ]
end
end

end

describe "event decoding with EventTime" do

let(:tag) { "mytag" }
let(:epochtime) { LogStash::Codecs::Fluent::EventTime.new(event.timestamp.to_i,
event.timestamp.usec * 1000) }
let(:data) { LogStash::Util.normalize(event.to_hash) }
let(:message) do
@packer.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)])
end
let(:epochtime) { LogStash::Codecs::Fluent::EventTime.new(timestamp.to_i, (timestamp.usec * 1000) + 123) }

subject { LogStash::Plugin.lookup("codec", "fluent").new({"nanosecond_precision" => true}) }

it "should decode without errors" do
decoded = false
subject.decode(message) do |event|
expect(event.get("name")).to eq("foo")
decoded = true
end
expect(decoded).to be true
end

it "decodes timestamp with nanos" do
subject.decode(message) do |event|
expect(event.timestamp.to_i).to eql epochtime.sec
expect(event.timestamp.usec * 1000 + 123).to eql epochtime.nsec
end
end

end

describe "event decoding with target" do

let(:tag) { "a_tag" }
let(:epochtime) { 123 }
let(:data) { LogStash::Util.normalize('name' => 'foo') }

let(:config) { super().merge "target" => '[bar]' }

it "should decode without errors" do
decoded = false
subject.decode(message) do |event|
expect(event.include?("name")).to be false
expect(event.get("bar")).to eql('name' => "foo")
decoded = true
end
expect(decoded).to be true
end

it "should tag event" do
subject.decode(message) do |event|
expect(event.get("tags")).to eql [ 'a_tag' ]
end
end

it "should set timestamp" do
subject.decode(message) do |event|
expect(event.timestamp.to_i).to eql(epochtime)
end
end

end

describe "forward protocol tag" do
let(:event) { LogStash::Event.new(properties) }
subject { LogStash::Plugin.lookup("codec", "fluent").new }

describe "when passing Array value" do
let(:properties) { {:tags => ["test", "logstash"], :name => "foo" } }


it "should be joined with '.'" do
subject.forwardable_tag(event) do |tag|
expect(tag).to eq("test.logstash")
Expand All @@ -104,7 +157,6 @@
describe "when passing String value" do
let(:properties) { {:tags => "test.logstash", :name => "foo" } }


it "should be pass-through" do
subject.forwardable_tag(event) do |tag|
expect(tag).to eq("test.logstash")
Expand All @@ -126,15 +178,13 @@

describe "event decoding (buckets of events)" do

let(:tag) { "mytag" }
let(:epochtime) { event.timestamp.to_i }
let(:data) { LogStash::Util.normalize(event.to_hash) }
let(:data) { LogStash::Util.normalize(event.to_hash) }
let(:message) do
@packer.pack([tag,
[
[epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)],
[epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)],
[epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)]
[epochtime, data.merge(LogStash::Event::TIMESTAMP => timestamp.to_iso8601)],
[epochtime, data.merge(LogStash::Event::TIMESTAMP => timestamp.to_iso8601)],
[epochtime, data.merge(LogStash::Event::TIMESTAMP => timestamp.to_iso8601)]
]
])
end
Expand All @@ -154,14 +204,7 @@

describe "event decoding (broken package)" do

let(:tag) { "mytag" }
let(:epochtime) { event.timestamp.to_s }
let(:data) { LogStash::Util.normalize(event.to_hash) }
let(:message) do
MessagePack.pack([tag,
epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)
])
end
let(:epochtime) { timestamp.to_s }

it "should decode with errors" do
subject.decode(message) do |event|
Expand Down