Skip to content

Commit 08f2966

Browse files
committed
exporters(otlp): wire in OTLP transport
Signed-off-by: inge4pres <[email protected]>
1 parent 430c5c9 commit 08f2966

File tree

4 files changed

+71
-26
lines changed

4 files changed

+71
-26
lines changed

src/otlp.zig

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -340,14 +340,14 @@ const HTTPClient = struct {
340340
const Self = @This();
341341

342342
allocator: std.mem.Allocator,
343-
config: ConfigOptions,
343+
config: *ConfigOptions,
344344
// Default HTTP Client
345345
client: http.Client,
346346
// Retries are processed using a separate thread.
347347
// A priority queue is maintained in the ExpBackoffRetry struct.
348348
retry: *ExpBackoffRetry,
349349

350-
pub fn init(allocator: std.mem.Allocator, config: ConfigOptions) !*Self {
350+
pub fn init(allocator: std.mem.Allocator, config: *ConfigOptions) !*Self {
351351
try config.validate();
352352

353353
const s = try allocator.create(Self);
@@ -370,7 +370,7 @@ const HTTPClient = struct {
370370
self.allocator.destroy(self);
371371
}
372372

373-
fn requestOptions(config: ConfigOptions) !http.Client.RequestOptions {
373+
fn requestOptions(config: *ConfigOptions) !http.Client.RequestOptions {
374374
const headers: http.Client.Request.Headers = .{
375375
.accept_encoding = if (config.compression.encodingHeaderValue()) |v| .{ .override = v } else .default,
376376
.content_type = .{ .override = switch (config.protocol) {
@@ -394,7 +394,7 @@ const HTTPClient = struct {
394394
// Send the OTLP data to the url using the client's configuration.
395395
// Data passed as argument should either be protobuf or JSON encoded, as specified in the config.
396396
// Data will be compressed here.
397-
fn send(self: *Self, url: []const u8, data: []u8) !void {
397+
fn send(self: *Self, url: []const u8, data: []const u8) !void {
398398
var resp_body = std.ArrayList(u8).init(self.allocator);
399399
defer resp_body.deinit();
400400

@@ -701,7 +701,7 @@ test "otlp HTTPClient send fails for missing server" {
701701
var config = try ConfigOptions.init(allocator);
702702
defer config.deinit();
703703

704-
const client = try HTTPClient.init(allocator, config.*);
704+
const client = try HTTPClient.init(allocator, config);
705705
defer client.deinit();
706706

707707
const url = try config.httpUrlForSignal(.metrics, allocator);
@@ -719,7 +719,7 @@ const pbtrace = @import("opentelemetry/proto/trace/v1.pb.zig");
719719
/// Export the data to the OTLP endpoint using the options configured with ConfigOptions.
720720
pub fn Export(
721721
allocator: std.mem.Allocator,
722-
config: ConfigOptions,
722+
config: *ConfigOptions,
723723
otlp_payload: Signal.Data,
724724
) !void {
725725
// Determine the type of client to be used, currently only HTTP is supported.

src/sdk/metrics/exporter.zig

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ const Attributes = @import("../../attributes.zig").Attributes;
1313
const InMemoryExporter = @import("./exporters/in_memory.zig").InMemoryExporter;
1414
const StdoutExporter = @import("./exporters/stdout.zig").StdoutExporter;
1515

16+
const otlp = @import("exporters/otlp.zig");
17+
const OTLPExporter = @import("./exporters/otlp.zig").OTLPExporter;
18+
1619
const view = @import("view.zig");
1720

1821
pub const ExportResult = enum {
@@ -90,6 +93,23 @@ pub const MetricExporter = struct {
9093
return .{ .exporter = exporter, .stdout = stdout };
9194
}
9295

96+
pub fn OTLP(
97+
allocator: std.mem.Allocator,
98+
temporality: ?view.TemporalitySelector,
99+
aggregation: ?view.AggregationSelector,
100+
options: *otlp.ConfigOptions,
101+
) !struct { exporter: *MetricExporter, otlp: *OTLPExporter } {
102+
const temporality_ = temporality orelse view.DefaultTemporality;
103+
104+
const otlp_exporter = try OTLPExporter.init(allocator, options, temporality_);
105+
const exporter = try MetricExporter.new(allocator, &otlp_exporter.exporter);
106+
// Default configuration
107+
exporter.temporality = temporality_;
108+
exporter.aggregation = aggregation orelse view.DefaultAggregation;
109+
110+
return .{ .exporter = exporter, .otlp = otlp_exporter };
111+
}
112+
93113
/// ExportBatch exports a batch of metrics data by calling the exporter implementation.
94114
/// The passed metrics data will be owned by the exporter implementation.
95115
//TODO exportBatch MUST have a timeout

src/sdk/metrics/exporters/otlp.zig

Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -35,57 +35,72 @@ pub const OTLPExporter = struct {
3535
allocator: std.mem.Allocator,
3636
exporter: ExporterImpl,
3737

38-
temporailty: view.TemporalitySelector,
38+
temporality: view.TemporalitySelector,
39+
config: *otlp.ConfigOptions,
40+
41+
pub fn init(allocator: std.mem.Allocator, config: *otlp.ConfigOptions, temporality: view.TemporalitySelector) !*Self {
42+
const s = try allocator.create(Self);
43+
s.* = Self{
44+
.allocator = allocator,
45+
.exporter = ExporterImpl{
46+
.exportFn = exportBatch,
47+
},
48+
.temporality = temporality,
49+
.config = config,
50+
};
51+
return s;
52+
}
3953

40-
config: otlp.ConfigOptions,
54+
pub fn deinit(self: *Self) void {
55+
self.allocator.destroy(self);
56+
}
4157

4258
pub fn exportBatch(iface: *ExporterImpl, data: []Measurements) MetricReadError!void {
4359
// Get a pointer to the instance of the struct that implements the interface.
4460
const self: *Self = @fieldParentPtr("exporter", iface);
45-
46-
// TODO: implement the OTLP exporter.
47-
// Processing pipeline:
48-
// 1. convert the measurements to protobuf format (clear up the datapoints after reading them).
49-
// 2. create an HTTP or gRPC client.
50-
// 3. send the data to the endpoint.
51-
// 4. handle the response and potential retries (exp backoff).
52-
61+
// Cleanup the data after use, it is mandatory for all exporters as they own the data argument.
5362
defer {
5463
for (data) |*m| {
5564
m.deinit(self.allocator);
5665
}
5766
self.allocator.free(data);
5867
}
59-
var resource_metrics = try self.allocator.alloc(pbmetrics.ResourceMetrics, 1);
68+
var resource_metrics = self.allocator.alloc(pbmetrics.ResourceMetrics, 1) catch |err| {
69+
std.debug.print("OTLP export failed to allocate memory for resource metrics: {s}\n", .{@errorName(err)});
70+
return MetricReadError.OutOfMemory;
71+
};
6072

6173
var scope_metrics = try self.allocator.alloc(pbmetrics.ScopeMetrics, data.len);
6274
for (data, 0..) |measurement, i| {
75+
const metrics = std.ArrayList(pbmetrics.Metric).initCapacity(self.allocator, 1) catch |err| {
76+
std.debug.print("OTLP export failed to allocate memory for metrics: {s}\n", .{@errorName(err)});
77+
return MetricReadError.OutOfMemory;
78+
};
79+
metrics.items[0] = try toProtobufMetric(self.allocator, measurement, self.temporality);
80+
const attributes = try attributesToProtobufKeyValueList(self.allocator, measurement.meterAttributes);
6381
scope_metrics[i] = pbmetrics.ScopeMetrics{
6482
.scope = pbcommon.InstrumentationScope{
6583
.name = ManagedString.managed(measurement.meterName),
6684
.version = if (measurement.meterVersion) |version| ManagedString.managed(version) else .Empty,
67-
// .schema_url = if (measurement.meterSchemaUrl) |s| ManagedString.managed(s) else .Empty,
68-
.attributes = try attributesToProtobufKeyValueList(self.allocator, measurement.meterAttributes),
85+
.attributes = attributes.values,
6986
},
7087
.schema_url = if (measurement.meterSchemaUrl) |s| ManagedString.managed(s) else .Empty,
71-
.metrics = try toProtobufMetric(self.allocator, measurement, self.temporailty(measurement.instrumentKind)),
88+
.metrics = metrics,
7289
};
7390
}
7491
resource_metrics[0] = pbmetrics.ResourceMetrics{
7592
.resource = null, //FIXME support resource attributes
76-
.scope_metrics = scope_metrics,
93+
.scope_metrics = std.ArrayList(pbmetrics.ScopeMetrics).fromOwnedSlice(self.allocator, scope_metrics),
7794
.schema_url = .Empty,
7895
};
7996

8097
const metrics_data = pbmetrics.MetricsData{
8198
.resource_metrics = std.ArrayList(pbmetrics.ResourceMetrics).fromOwnedSlice(self.allocator, resource_metrics),
8299
};
83-
defer metrics_data.deinit(self.allocator);
100+
defer metrics_data.deinit();
84101

85-
// TODO: offload the data the the OTLP transport.
86-
// Problem is: the OTLP transport should be the one in charge of detecting the encoding via the configuration.
87-
otlp.Export(self.config, otlp.Signal{ .metrics = metrics_data }) catch |err| {
88-
std.debug.print("OTLP export failed: {s}", .{@tagName(err)});
102+
otlp.Export(self.allocator, self.config, otlp.Signal.Data{ .metrics = metrics_data }) catch |err| {
103+
std.debug.print("OTLP export failed: {s}", .{@errorName(err)});
89104
return MetricReadError.ExportFailed;
90105
};
91106
}
@@ -298,3 +313,12 @@ test "exporters/otlp conversion for HistogramDataPoint" {
298313
metric.data.?.histogram.data_points.items[0].attributes.items[0].value.?.value.?.string_value,
299314
});
300315
}
316+
317+
test "exporters/otlp init/deinit" {
318+
const allocator = std.testing.allocator;
319+
const config = try otlp.ConfigOptions.init(allocator);
320+
defer config.deinit();
321+
322+
var exporter = try OTLPExporter.init(allocator, config, view.DefaultTemporality);
323+
defer exporter.deinit();
324+
}

src/sdk/metrics/reader.zig

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ pub const MetricReadError = error{
3434
ExportFailed,
3535
ForceFlushTimedOut,
3636
ConcurrentCollectNotAllowed,
37+
OutOfMemory,
3738
};
3839

3940
/// MetricReader reads metrics' data from a MeterProvider.

0 commit comments

Comments
 (0)