Skip to content

Commit 430c5c9

Browse files
committed
otlp: implement exporting signals' data
Signed-off-by: inge4pres <[email protected]>
1 parent 625a9c5 commit 430c5c9

File tree

3 files changed

+110
-14
lines changed

3 files changed

+110
-14
lines changed

src/api/metrics/instrument.zig

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -745,7 +745,7 @@ fn testCounterAddingOne(counter: *Counter(u64)) !void {
745745
fn testCounterCollect(counter: *Counter(u64)) !void {
746746
// FIXME flaky test can result in failure, so we added a sleep but we should find a more robust solution.
747747
while (!counter.lock.tryLock()) {
748-
std.time.sleep(10);
748+
std.time.sleep(44);
749749
}
750750
counter.lock.unlock();
751751

@@ -790,7 +790,7 @@ fn testHistogramRecordOne(histogram: *Histogram(u64)) !void {
790790
fn testHistogramCollect(histogram: *Histogram(u64)) !void {
791791
// FIXME flaky test can result in failure, so we added a sleep but we should find a more robust solution.
792792
while (!histogram.lock.tryLock()) {
793-
std.time.sleep(10);
793+
std.time.sleep(44);
794794
}
795795
histogram.lock.unlock();
796796

src/otlp.zig

Lines changed: 92 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ pub const ConfigError = error{
2020
InvalidProtocol,
2121
};
2222

23+
/// Error set for the OTLP Export operation.
24+
pub const ExportError = error{
25+
UnimplementedTransportProtocol,
26+
};
27+
2328
/// The combination of underlying transport protocol and format used to send the data.
2429
pub const Protocol = enum {
2530
// In order of precedence: SDK MUST support http/protobuf and SHOULD support grpc and http/json.
@@ -75,15 +80,60 @@ pub const Signal = enum {
7580
// TODO add other signals when implemented
7681
// profiles,
7782

78-
fn defaulttHttpPath(self: Signal) []const u8 {
83+
const Self = @This();
84+
85+
fn defaulttHttpPath(self: Self) []const u8 {
7986
switch (self) {
8087
.metrics => return "/v1/metrics",
8188
.logs => return "/v1/logs",
8289
.traces => return "/v1/traces",
8390
}
8491
}
92+
93+
/// Actual signal data as protobuf messages.
94+
pub const Data = union(Self) {
95+
metrics: pbmetrics.MetricsData,
96+
logs: pblogs.LogsData,
97+
traces: pbtrace.TracesData,
98+
// TODO add other signals when implemented
99+
// profiles: profiles.ProfilesData,
100+
101+
fn toOwnedSlice(self: Data, allocator: std.mem.Allocator, protocol: Protocol) ![]const u8 {
102+
return switch (protocol) {
103+
.http_json => {
104+
switch (self) {
105+
// All protobuf-generated structs have a json_encode method.
106+
inline else => |data| return data.json_encode(.{}, allocator),
107+
}
108+
},
109+
.http_protobuf, .grpc => {
110+
switch (self) {
111+
// All protobuf-generated structs have a encode method.
112+
inline else => |data| return data.encode(allocator),
113+
}
114+
},
115+
};
116+
}
117+
118+
fn signal(self: Data) Signal {
119+
return std.meta.activeTag(self);
120+
}
121+
};
85122
};
86123

124+
test "otlp Signal.Data get payload bytes" {
125+
const allocator = std.testing.allocator;
126+
var data = Signal.Data{
127+
.metrics = pbmetrics.MetricsData{
128+
.resource_metrics = std.ArrayList(pbmetrics.ResourceMetrics).init(allocator),
129+
},
130+
};
131+
const payload = try data.toOwnedSlice(allocator, Protocol.http_protobuf);
132+
defer allocator.free(payload);
133+
134+
try std.testing.expectEqual(payload.len, 0);
135+
}
136+
87137
/// Configuration options for the OTLP transport.
88138
pub const ConfigOptions = struct {
89139
allocator: std.mem.Allocator,
@@ -293,6 +343,8 @@ const HTTPClient = struct {
293343
config: ConfigOptions,
294344
// Default HTTP Client
295345
client: http.Client,
346+
// Retries are processed using a separate thread.
347+
// A priority queue is maintained in the ExpBackoffRetry struct.
296348
retry: *ExpBackoffRetry,
297349

298350
pub fn init(allocator: std.mem.Allocator, config: ConfigOptions) !*Self {
@@ -339,9 +391,10 @@ const HTTPClient = struct {
339391
return request_options;
340392
}
341393

342-
/// For the Signal type, send the data to the OTLP endpoint using the client's configuration.
343-
/// Data passed as argument should either be protobuf or JSON encoded, as specified in the config.
344-
pub fn send(self: *Self, signal: Signal, data: []u8) !void {
394+
// Send the OTLP data to the url using the client's configuration.
395+
// Data passed as argument should either be protobuf or JSON encoded, as specified in the config.
396+
// Data will be compressed here.
397+
fn send(self: *Self, url: []const u8, data: []u8) !void {
345398
var resp_body = std.ArrayList(u8).init(self.allocator);
346399
defer resp_body.deinit();
347400

@@ -363,9 +416,6 @@ const HTTPClient = struct {
363416
};
364417
defer self.allocator.free(req_body);
365418

366-
const url = try self.config.httpUrlForSignal(signal, self.allocator);
367-
defer self.allocator.free(url);
368-
369419
const req_opts = try requestOptions(self.config);
370420

371421
const fetch_request = http.Client.FetchOptions{
@@ -654,7 +704,41 @@ test "otlp HTTPClient send fails for missing server" {
654704
const client = try HTTPClient.init(allocator, config.*);
655705
defer client.deinit();
656706

707+
const url = try config.httpUrlForSignal(.metrics, allocator);
708+
defer allocator.free(url);
709+
657710
var payload = [_]u8{0} ** 1024;
658-
const result = client.send(Signal.metrics, &payload);
711+
const result = client.send(url, &payload);
659712
try std.testing.expectError(std.posix.ConnectError.ConnectionRefused, result);
660713
}
714+
715+
const pbmetrics = @import("opentelemetry/proto/metrics/v1.pb.zig");
716+
const pblogs = @import("opentelemetry/proto/logs/v1.pb.zig");
717+
const pbtrace = @import("opentelemetry/proto/trace/v1.pb.zig");
718+
719+
/// Export the data to the OTLP endpoint using the options configured with ConfigOptions.
720+
pub fn Export(
721+
allocator: std.mem.Allocator,
722+
config: ConfigOptions,
723+
otlp_payload: Signal.Data,
724+
) !void {
725+
// Determine the type of client to be used, currently only HTTP is supported.
726+
const client = switch (config.protocol) {
727+
.http_json, .http_protobuf => try HTTPClient.init(allocator, config),
728+
.grpc => return ExportError.UnimplementedTransportProtocol,
729+
};
730+
731+
const payload = otlp_payload.toOwnedSlice(allocator, config.protocol) catch |err| {
732+
std.debug.print("OTLP transport: failed to encode payload via {s}: {?}\n", .{ @tagName(config.protocol), err });
733+
return err;
734+
};
735+
defer allocator.free(payload);
736+
737+
const url = try config.httpUrlForSignal(otlp_payload.signal(), allocator);
738+
defer allocator.free(url);
739+
740+
client.send(url, payload) catch |err| {
741+
std.debug.print("OTLP transport: failed to send payload: {?}\n", .{err});
742+
return err;
743+
};
744+
}

src/sdk/metrics/exporters/otlp.zig

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ const MetricReadError = @import("../reader.zig").MetricReadError;
2727
const otlp = @import("../../../otlp.zig");
2828

2929
/// Exports metrics via the OpenTelemetry Protocol (OTLP).
30-
/// OTLP is a binary protocol used for transmitting telemetry data, encoding them with protobuf.
30+
/// OTLP is a binary protocol used for transmitting telemetry data, encoding them with protobuf or JSON.
3131
/// See https://opentelemetry.io/docs/specs/otlp/
3232
pub const OTLPExporter = struct {
3333
const Self = @This();
@@ -56,8 +56,7 @@ pub const OTLPExporter = struct {
5656
}
5757
self.allocator.free(data);
5858
}
59-
var output = try self.allocator.create(pbmetrics.ResourceMetrics);
60-
defer output.deinit(self.allocator);
59+
var resource_metrics = try self.allocator.alloc(pbmetrics.ResourceMetrics, 1);
6160

6261
var scope_metrics = try self.allocator.alloc(pbmetrics.ScopeMetrics, data.len);
6362
for (data, 0..) |measurement, i| {
@@ -68,14 +67,27 @@ pub const OTLPExporter = struct {
6867
// .schema_url = if (measurement.meterSchemaUrl) |s| ManagedString.managed(s) else .Empty,
6968
.attributes = try attributesToProtobufKeyValueList(self.allocator, measurement.meterAttributes),
7069
},
70+
.schema_url = if (measurement.meterSchemaUrl) |s| ManagedString.managed(s) else .Empty,
7171
.metrics = try toProtobufMetric(self.allocator, measurement, self.temporailty(measurement.instrumentKind)),
7272
};
7373
}
74-
output.* = pbmetrics.ResourceMetrics{
74+
resource_metrics[0] = pbmetrics.ResourceMetrics{
7575
.resource = null, //FIXME support resource attributes
7676
.scope_metrics = scope_metrics,
7777
.schema_url = .Empty,
7878
};
79+
80+
const metrics_data = pbmetrics.MetricsData{
81+
.resource_metrics = std.ArrayList(pbmetrics.ResourceMetrics).fromOwnedSlice(self.allocator, resource_metrics),
82+
};
83+
defer metrics_data.deinit(self.allocator);
84+
85+
// TODO: offload the data the the OTLP transport.
86+
// Problem is: the OTLP transport should be the one in charge of detecting the encoding via the configuration.
87+
otlp.Export(self.config, otlp.Signal{ .metrics = metrics_data }) catch |err| {
88+
std.debug.print("OTLP export failed: {s}", .{@tagName(err)});
89+
return MetricReadError.ExportFailed;
90+
};
7991
}
8092
};
8193

0 commit comments

Comments
 (0)