Skip to content

Add gzip compression option for exporter #306

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
25 changes: 22 additions & 3 deletions apps/opentelemetry_exporter/src/opentelemetry_exporter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
ssl_options => []}.

-type protocol() :: grpc | http_protobuf | http_json.
-type compression() :: gzip.

-type opts() :: #{endpoints => [endpoint()],
headers => headers(),
Expand All @@ -103,6 +104,7 @@
-record(state, {protocol :: protocol(),
channel_pid :: pid() | undefined,
headers :: headers(),
compression :: compression() | undefined,
grpc_metadata :: map() | undefined,
endpoints :: [endpoint_map()]}).

Expand All @@ -113,14 +115,22 @@ init(Opts) ->
SSLOptions = maps:get(ssl_options, Opts1, undefined),
Endpoints = endpoints(maps:get(endpoints, Opts1, ?DEFAULT_ENDPOINTS), SSLOptions),
Headers = headers(maps:get(headers, Opts1, [])),
Compression = maps:get(compression, Opts1, undefined),
case maps:get(protocol, Opts1, http_protobuf) of
grpc ->
ChannelOpts = maps:get(channel_opts, Opts1, #{}),
case grpcbox_channel:start_link(?MODULE, grpcbox_endpoints(Endpoints), ChannelOpts) of
UpdatedChannelOpts = case Compression of
undefined -> ChannelOpts;
Encoding -> maps:put(encoding, Encoding, ChannelOpts)
end,
case grpcbox_channel:start_link(?MODULE,
grpcbox_endpoints(Endpoints),
UpdatedChannelOpts) of
{ok, ChannelPid} ->
{ok, #state{channel_pid=ChannelPid,
endpoints=Endpoints,
headers=Headers,
compression=Compression,
grpc_metadata=headers_to_grpc_metadata(Headers),
protocol=grpc}};
ErrorOrIgnore ->
Expand All @@ -130,15 +140,18 @@ init(Opts) ->
"to http_protobuf protocol. reason=~p", [ErrorOrIgnore]),
{ok, #state{endpoints=Endpoints,
headers=Headers,
compression=Compression,
protocol=http_protobuf}}
end;
http_protobuf ->
{ok, #state{endpoints=Endpoints,
headers=Headers,
compression=Compression,
protocol=http_protobuf}};
http_json ->
{ok, #state{endpoints=Endpoints,
headers=Headers,
compression=Compression,
protocol=http_json}}
end.

Expand All @@ -147,18 +160,24 @@ export(_Tab, _Resource, #state{protocol=http_json}) ->
{error, unimplemented};
export(Tab, Resource, #state{protocol=http_protobuf,
headers=Headers,
compression=Compression,
endpoints=[#{scheme := Scheme,
host := Host,
path := Path,
port := Port,
ssl_options := SSLOptions} | _]}) ->
Proto = opentelemetry_exporter_trace_service_pb:encode_msg(tab_to_proto(Tab, Resource),
export_trace_service_request),
{NewHeaders, NewProto} = case Compression of
gzip -> {[{"content-encoding", "gzip"} | Headers], zlib:gzip(Proto)};
_ -> {Headers, Proto}
end,
Address = uri_string:normalize(#{scheme => Scheme,
host => Host,
port => Port,
path => Path}),
case httpc:request(post, {Address, Headers, "application/x-protobuf", Proto},

case httpc:request(post, {Address, NewHeaders, "application/x-protobuf", NewProto},
[{ssl, SSLOptions}], []) of
{ok, {{_, Code, _}, _, _}} when Code >= 200 andalso Code =< 202 ->
ok;
Expand Down Expand Up @@ -199,7 +218,7 @@ shutdown(#state{channel_pid=Pid}) ->
%%

grpcbox_endpoints(Endpoints) ->
[{scheme(Scheme), Host, Port, maps:get(ssl_options, Endpoint, [])} ||
[{scheme(Scheme), Host, Port, maps:get(ssl_options, Endpoint, [])} ||
#{scheme := Scheme, host := Host, port := Port} = Endpoint <- Endpoints].

headers_to_grpc_metadata(Headers) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@
-include_lib("opentelemetry/include/otel_span.hrl").

all() ->
[{group, functional}, {group, http_protobuf}, {group, grpc}].
[{group, functional}, {group, http_protobuf}, {group, http_protobuf_gzip},
{group, grpc}, {group, grpc_gzip}].

groups() ->
[{functional, [], [configuration, span_round_trip, ets_instrumentation_info]},
{grpc, [], [verify_export]},
{http_protobuf, [], [verify_export]}].
{grpc_gzip, [], [verify_export]},
{http_protobuf, [], [verify_export]},
{http_protobuf_gzip, [], [verify_export]}].

init_per_suite(Config) ->
Config.
Expand All @@ -26,6 +29,12 @@ init_per_group(Group, Config) when Group =:= grpc ;
Group =:= http_protobuf ->
application:ensure_all_started(opentelemetry_exporter),
[{protocol, Group}| Config];
init_per_group(http_protobuf_gzip, Config) ->
application:ensure_all_started(opentelemetry_exporter),
[{protocol, http_protobuf}, {compression, gzip} | Config];
init_per_group(grpc_gzip, Config) ->
application:ensure_all_started(opentelemetry_exporter),
[{protocol, grpc}, {compression, gzip} | Config];
init_per_group(_, _) ->
application:load(opentelemetry_exporter),
ok.
Expand Down Expand Up @@ -212,16 +221,22 @@ span_round_trip(_Config) ->
verify_export(Config) ->
os:putenv("OTEL_RESOURCE_ATTRIBUTES", "service.name=my-test-service,service.version=98da75ea6d38724743bf42b45565049238d86b3f"),
Protocol = ?config(protocol, Config),
Compression = ?config(compression, Config),
Port = case Protocol of
grpc ->
4317;
http_protobuf ->
55681
end,
{ok, State} = opentelemetry_exporter:init(#{protocol => Protocol,
compression => Compression,
endpoints => [{http, "localhost", Port, []}]}),
Tid = ets:new(span_tab, [duplicate_bag, {keypos, #span.instrumentation_library}]),

%% Tempoararily adding this because without this, we would face
%% {error, no_endpoints} when attempt to export when we have more
%% than 1 gprc test case.
timer:sleep(500),
?assertMatch(ok, opentelemetry_exporter:export(Tid, otel_resource:create([]), State)),

TraceId = otel_id_generator:generate_trace_id(),
Expand Down