Skip to content

enhancement(greptimedb_logs sink): add headers field to greptimedb_logs config #22651

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 24, 2025
32 changes: 32 additions & 0 deletions changelog.d/22651_greptimedb_logs_headers.breaking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
Added a new `extra_headers` option to `greptimedb_logs` sink configuration to set additional headers for outgoing requests.

change `greptimedb_logs` sink default content type to `application/x-ndjson` to match the default content type of `greptimedb` sink
if you use the greptimedb version v0.12 or earlier, you need to set the content type to `application/json` in the sink configuration

Example:

```yaml
sinks:
greptime_logs:
type: greptimedb_logs
inputs: ["my_source_id"]
endpoint: "http://localhost:4000"
table: "demo_logs"
dbname: "public"
extra_headers:
x-source: vector
```

```toml
[sinks.greptime_logs]
type = "greptimedb_logs"
inputs = ["my_source_id"]
endpoint = "http://localhost:4000"
table = "demo_logs"
dbname = "public"

[sinks.greptime_logs.extra_headers]
x-source = "vector"
```

authors: greptimedb
2 changes: 1 addition & 1 deletion scripts/integration/greptimedb/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ runner:
matrix:
# Temporarily pegging to the latest known stable release
# since using `latest` is failing consistently.
version: [v0.9.5]
version: [v0.13.2]

# changes to these files/paths will invoke the integration test in CI
# expressions are evaluated using https://github.com/micromatch/picomatch
Expand Down
15 changes: 15 additions & 0 deletions src/sinks/greptimedb/logs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ fn extra_params_examples() -> HashMap<String, String> {
HashMap::<_, _>::from_iter([("source".to_owned(), "vector".to_owned())])
}

fn extra_headers_examples() -> HashMap<String, String> {
HashMap::new()
}

/// Configuration for the `greptimedb_logs` sink.
#[configurable_component(sink("greptimedb_logs", "Ingest logs data into GreptimeDB."))]
#[derive(Clone, Debug, Default, Derivative)]
Expand Down Expand Up @@ -96,6 +100,16 @@ pub struct GreptimeDBLogsConfig {
#[configurable(metadata(docs::examples = "extra_params_examples()"))]
pub extra_params: Option<HashMap<String, String>>,

/// Custom headers to add to the HTTP request sent to GreptimeDB.
/// Note that these headers will override the existing headers.
#[serde(default)]
#[configurable(metadata(docs::advanced))]
#[configurable(metadata(
docs::additional_props_description = "Extra header key-value pairs."
))]
#[configurable(metadata(docs::examples = "extra_headers_examples()"))]
pub extra_headers: Option<HashMap<String, String>>,

#[configurable(derived)]
#[serde(default)]
pub(crate) batch: BatchConfig<GreptimeDBDefaultBatchSettings>,
Expand Down Expand Up @@ -144,6 +158,7 @@ impl SinkConfig for GreptimeDBLogsConfig {
),
compression: self.compression,
extra_params: self.extra_params.clone(),
extra_headers: self.extra_headers.clone(),
};

let service: HttpService<GreptimeDBLogsHttpRequestBuilder, PartitionKey> =
Expand Down
9 changes: 8 additions & 1 deletion src/sinks/greptimedb/logs/http_request_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ pub(super) struct GreptimeDBLogsHttpRequestBuilder {
pub(super) encoder: (Transformer, Encoder<Framer>),
pub(super) compression: Compression,
pub(super) extra_params: Option<HashMap<String, String>>,
pub(super) extra_headers: Option<HashMap<String, String>>,
}

fn prepare_log_ingester_url(
Expand Down Expand Up @@ -143,9 +144,15 @@ impl HttpServiceRequestBuilder<PartitionKey> for GreptimeDBLogsHttpRequestBuilde
let payload = request.take_payload();

let mut builder = Request::post(&url)
.header(CONTENT_TYPE, "application/json")
.header(CONTENT_TYPE, "application/x-ndjson")
.header(CONTENT_LENGTH, payload.len());

if let Some(extra_headers) = self.extra_headers.as_ref() {
for (key, value) in extra_headers.iter() {
builder = builder.header(key, value);
}
}

if let Some(ce) = self.compression.content_encoding() {
builder = builder.header(CONTENT_ENCODING, ce);
}
Expand Down
15 changes: 15 additions & 0 deletions website/cue/reference/components/sinks/base/greptimedb_logs.cue
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,21 @@ base: components: sinks: greptimedb_logs: configuration: {
required: true
type: string: examples: ["http://localhost:4000"]
}
extra_headers: {
description: """
Custom headers to add to the HTTP request sent to GreptimeDB.
Note that these headers will override the existing headers.
"""
required: false
type: object: {
examples: [{}]
options: "*": {
description: "Extra header key-value pairs."
required: true
type: string: {}
}
}
}
extra_params: {
description: "Custom parameters to add to the query string for each HTTP request sent to GreptimeDB."
required: false
Expand Down
Loading