Skip to content

proxy: go: add context for input methods #10193

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

milkrage
Copy link

@milkrage milkrage commented Apr 9, 2025

These changes resolve the following issue: #8464


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
service:
  log_level: debug

pipeline:
  inputs:
    - name: go-input-plugin-with-context
      config-key: config-value-1
      tag: one
    - name: go-input-plugin-with-context
      config-key: config-value-1
      tag: one

  outputs:
    - name: stdout
      match: '*'
  • Go Plugin Example (without .h)
package main

/*
#include <stdlib.h>
*/
import "C"
import (
	"fmt"
	"log"
	"time"
	"unsafe"

	"fluent-bit-input-plugin/input"
)

//export FLBPluginRegister
func FLBPluginRegister(def unsafe.Pointer) int {
	return input.FLBPluginRegister(def, "go-input-plugin-with-context", "Go Input Plugin With Context!")
}

//export FLBPluginInit
func FLBPluginInit(plugin unsafe.Pointer) int {
	// Get config param
	param := input.FLBPluginConfigKey(plugin, "config-key")
	fmt.Printf("[flb-go][FLBPluginInit] Plugin parameter config-key=%s \n", param)

	// Set config param to context
	input.FLBPluginSetContext(plugin, param)

	return input.FLB_OK
}

//export FLBPluginInputCallbackCtx
func FLBPluginInputCallbackCtx(ctx unsafe.Pointer, data *unsafe.Pointer, size *C.size_t) int {
	// Get config param from context
	value := input.FLBPluginGetContext(ctx).(string)
	log.Printf("[flb-go][multiinstance][FLBPluginInputCallbackCtx] Plugin parameter config-key=%s \n", value)

	now := time.Now()
	flb_time := input.FLBTime{now}
	message := map[string]string{"message": value}

	entry := []interface{}{flb_time, message}

	enc := input.NewEncoder()
	packed, err := enc.Encode(entry)
	if err != nil {
		fmt.Println("Can't convert to msgpack:", message, err)
		return input.FLB_ERROR
	}

	length := len(packed)
	*data = C.CBytes(packed)
	*size = C.size_t(length)
	// For emitting interval adjustment.
	time.Sleep(5000 * time.Millisecond)

	return input.FLB_OK
}

//export FLBPluginInputCleanupCallback
func FLBPluginInputCleanupCallback(data unsafe.Pointer) int {
	return input.FLB_OK
}

//export FLBPluginExit
func FLBPluginExit() int {
	return input.FLB_OK
}

func main() {
}
  • Debug log output from testing the change
root@35c092a175d8:/fluent-bit-input-plugin# fluent-bit -e ./input-plugin-with-context.so -c development/fluent-bit.yaml
Fluent Bit v4.0.1
* Copyright (C) 2015-2025 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

______ _                  _    ______ _ _             ___  _____ 
|  ___| |                | |   | ___ (_) |           /   ||  _  |
| |_  | |_   _  ___ _ __ | |_  | |_/ /_| |_  __   __/ /| || |/' |
|  _| | | | | |/ _ \ '_ \| __| | ___ \ | __| \ \ / / /_| ||  /| |
| |   | | |_| |  __/ | | | |_  | |_/ / | |_   \ V /\___  |\ |_/ /
\_|   |_|\__,_|\___|_| |_|\__| \____/|_|\__|   \_/     |_(_)___/ 


[2025/04/09 20:14:02] [ info] Configuration:
[2025/04/09 20:14:02] [ info]  flush time     | 1.000000 seconds
[2025/04/09 20:14:02] [ info]  grace          | 5 seconds
[2025/04/09 20:14:02] [ info]  daemon         | 0
[2025/04/09 20:14:02] [ info] ___________
[2025/04/09 20:14:02] [ info]  inputs:
[2025/04/09 20:14:02] [ info]      go-input-plugin-with-context
[2025/04/09 20:14:02] [ info]      go-input-plugin-with-context
[2025/04/09 20:14:02] [ info] ___________
[2025/04/09 20:14:02] [ info]  filters:
[2025/04/09 20:14:02] [ info] ___________
[2025/04/09 20:14:02] [ info]  outputs:
[2025/04/09 20:14:02] [ info]      stdout.0
[2025/04/09 20:14:02] [ info] ___________
[2025/04/09 20:14:02] [ info]  collectors:
[2025/04/09 20:14:02] [ info] [fluent bit] version=4.0.1, commit=7505318ca3, pid=9174
[2025/04/09 20:14:02] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2025/04/09 20:14:02] [ info] [storage] ver=1.5.2, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2025/04/09 20:14:02] [ info] [simd    ] disabled
[2025/04/09 20:14:02] [ info] [cmetrics] version=0.9.9
[2025/04/09 20:14:02] [ info] [ctraces ] version=0.6.2
[2025/04/09 20:14:02] [ info] [input:go-input-plugin-with-context:go-input-plugin-with-context.0] initializing
[2025/04/09 20:14:02] [ info] [input:go-input-plugin-with-context:go-input-plugin-with-context.0] storage_strategy='memory' (memory only)
[flb-go][FLBPluginInit] Plugin parameter config-key=config-value-1 
[2025/04/09 20:14:02] [debug] [input:go-input-plugin-with-context:go-input-plugin-with-context.0] [thread init] initialization OK
[2025/04/09 20:14:02] [ info] [input:go-input-plugin-with-context:go-input-plugin-with-context.0] thread instance initialized
[2025/04/09 20:14:02] [debug] [go-input-plugin-with-context:go-input-plugin-with-context.0] created event channels: read=38 write=39
[2025/04/09 20:14:02] [ info] [input:go-input-plugin-with-context:go-input-plugin-with-context.1] initializing
[2025/04/09 20:14:02] [ info] [input:go-input-plugin-with-context:go-input-plugin-with-context.1] storage_strategy='memory' (memory only)
[flb-go][FLBPluginInit] Plugin parameter config-key=config-value-2 
[2025/04/09 20:14:02] [debug] [input:go-input-plugin-with-context:go-input-plugin-with-context.1] [thread init] initialization OK
[2025/04/09 20:14:02] [ info] [input:go-input-plugin-with-context:go-input-plugin-with-context.1] thread instance initialized
[2025/04/09 20:14:02] [debug] [go-input-plugin-with-context:go-input-plugin-with-context.1] created event channels: read=55 write=56
[2025/04/09 20:14:02] [debug] [stdout:stdout.0] created event channels: read=59 write=60
[2025/04/09 20:14:02] [debug] [router] match rule go-input-plugin-with-context.0:stdout.0
[2025/04/09 20:14:02] [ info] [output:stdout:stdout.0] worker #0 started
[2025/04/09 20:14:02] [debug] [router] match rule go-input-plugin-with-context.1:stdout.0
[2025/04/09 20:14:02] [ info] [sp] stream processor started
2025/04/09 20:14:03 [flb-go][multiinstance][FLBPluginInputCallbackCtx] Plugin parameter config-key=config-value-1 
2025/04/09 20:14:03 [flb-go][multiinstance][FLBPluginInputCallbackCtx] Plugin parameter config-key=config-value-2 
2025/04/09 20:14:08 [flb-go][multiinstance][FLBPluginInputCallbackCtx] Plugin parameter config-key=config-value-2 
2025/04/09 20:14:08 [flb-go][multiinstance][FLBPluginInputCallbackCtx] Plugin parameter config-key=config-value-1 
[2025/04/09 20:14:09] [debug] [task] created task=0x7f90b802bed0 id=0 OK
[2025/04/09 20:14:09] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[2025/04/09 20:14:09] [debug] [task] created task=0x7f90b802bfa0 id=1 OK
[2025/04/09 20:14:09] [debug] [output:stdout:stdout.0] task_id=1 assigned to thread #0
[0] one: [[1744229643.399235929, {}], {"message"=>"config-value-1"}]
[0] one: [[1744229643.400736182, {}], {"message"=>"config-value-2"}]
[2025/04/09 20:14:09] [debug] [out flush] cb_destroy coro_id=0
[2025/04/09 20:14:09] [debug] [out flush] cb_destroy coro_id=1
[2025/04/09 20:14:09] [debug] [task] destroy task=0x7f90b802bed0 (task_id=0)
[2025/04/09 20:14:09] [debug] [task] destroy task=0x7f90b802bfa0 (task_id=1)
2025/04/09 20:14:13 [flb-go][multiinstance][FLBPluginInputCallbackCtx] Plugin parameter config-key=config-value-2 
2025/04/09 20:14:13 [flb-go][multiinstance][FLBPluginInputCallbackCtx] Plugin parameter config-key=config-value-1 
[2025/04/09 20:14:14] [debug] [task] created task=0x7f90b80ac390 id=0 OK
[2025/04/09 20:14:14] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[2025/04/09 20:14:14] [debug] [task] created task=0x7f90b80ac460 id=1 OK
[0] one: [[1744229648.404982589, {}], {"message"=>"config-value-1"}]
[2025/04/09 20:14:14] [debug] [output:stdout:stdout.0] task_id=1 assigned to thread #0
[0] one: [[1744229648.404970546, {}], {"message"=>"config-value-2"}]
[2025/04/09 20:14:14] [debug] [out flush] cb_destroy coro_id=2
[2025/04/09 20:14:14] [debug] [out flush] cb_destroy coro_id=3
[2025/04/09 20:14:14] [debug] [task] destroy task=0x7f90b80ac390 (task_id=0)
[2025/04/09 20:14:14] [debug] [task] destroy task=0x7f90b80ac460 (task_id=1)
^C[2025/04/09 20:14:15] [engine] caught signal (SIGINT)
[2025/04/09 20:14:15] [ warn] [engine] service will shutdown in max 5 seconds
[2025/04/09 20:14:15] [debug] [input:go-input-plugin-with-context:go-input-plugin-with-context.0] thread pause instance
[2025/04/09 20:14:15] [debug] [input:go-input-plugin-with-context:go-input-plugin-with-context.1] thread pause instance
[2025/04/09 20:14:16] [ info] [engine] service has stopped (0 pending tasks)
[2025/04/09 20:14:16] [debug] [input:go-input-plugin-with-context:go-input-plugin-with-context.0] thread pause instance
[2025/04/09 20:14:16] [debug] [input:go-input-plugin-with-context:go-input-plugin-with-context.1] thread pause instance
[2025/04/09 20:14:16] [ info] [output:stdout:stdout.0] thread worker #0 stopping...
[2025/04/09 20:14:16] [ info] [output:stdout:stdout.0] thread worker #0 stopped
2025/04/09 20:14:18 [flb-go][multiinstance][FLBPluginInputCallbackCtx] Plugin parameter config-key=config-value-1 
2025/04/09 20:14:18 [flb-go][multiinstance][FLBPluginInputCallbackCtx] Plugin parameter config-key=config-value-2 
2025/04/09 20:14:23 [flb-go][multiinstance][FLBPluginInputCallbackCtx] Plugin parameter config-key=config-value-1 
[2025/04/09 20:14:23] [debug] [GO] running exit callback
[2025/04/09 20:14:23] [debug] [input:go-input-plugin-with-context:go-input-plugin-with-context.1] thread exit instance
2025/04/09 20:14:28 [flb-go][multiinstance][FLBPluginInputCallbackCtx] Plugin parameter config-key=config-value-1 
2025/04/09 20:14:33 [flb-go][multiinstance][FLBPluginInputCallbackCtx] Plugin parameter config-key=config-value-1 
[2025/04/09 20:14:38] [debug] [GO] running exit callback
[2025/04/09 20:14:38] [debug] [input:go-input-plugin-with-context:go-input-plugin-with-context.0] thread exit instance
Segmentation fault
  • Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • [N/A] Documentation required for this feature

Backporting

  • [N/A] Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

@milkrage
Copy link
Author

milkrage commented Apr 10, 2025

@mp3monster, jfyi (as the author of the issue)

@milkrage milkrage force-pushed the ctx_go_input_plugin branch from 7505318 to 27a727b Compare April 11, 2025 10:23
@milkrage
Copy link
Author

Нello, @edsiper, @leonardo-albertovich! Could you please tell if there is an estimated start date of review of this PR?

This PR would be helpful for others, because before these changes, the context was only available in the output plugin. This significantly limited the capabilities of input plugins in GoLang, since working in multiinstance mode was impossible.

`flb_plugin_proxy_context` was replaced by `flb_plugin_input_proxy_context`

Signed-off-by: milkrage <[email protected]>
This is necessary so that go input plugin can implement methods with
context in the same way as it is done in go output plugin.

Signed-off-by: milkrage <[email protected]>
The way to add context to input methods is completely identical
to the approach used for output methods.

Without this, the callback can only perform hardwired tasks,
and the configuration values can not be accessed. As a result,
the input becomes extremely constrained on what it can do.

Signed-off-by: milkrage <[email protected]>
@milkrage milkrage force-pushed the ctx_go_input_plugin branch from 27a727b to aa810b0 Compare April 22, 2025 18:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant