Skip to content

Commit 72fb725

Browse files
committed
refactor: move version range functions to kpro_api_vsn module
1 parent 77d5941 commit 72fb725

File tree

3 files changed

+70
-66
lines changed

3 files changed

+70
-66
lines changed

src/kpro_api_vsn.erl

Lines changed: 66 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
%% Supported versions of THIS lib
1717
-module(kpro_api_vsn).
18-
-export([range/1, kafka_09_range/1, intersect/2]).
18+
-export([range/1, kafka_09_range/1, intersect/1, intersect/2]).
1919

2020
-export_type([range/0]).
2121

@@ -54,19 +54,76 @@ kafka_09_range(describe_groups) -> {0, 0};
5454
kafka_09_range(list_groups) -> {0, 0};
5555
kafka_09_range(_) -> false.
5656

57-
%% @doc Returns the intersection of two version ranges.
57+
%% @private Returns the intersection of two version ranges.
5858
%% An error is raised if there is no intersection.
59-
-spec intersect(false | range(), false | range()) -> false | range().
60-
intersect(false, _) -> false;
61-
intersect(_, false) -> false;
62-
intersect({Min1, Max1} = R1, {Min2, Max2} = R2) ->
63-
Min = max(Min1, Min2),
64-
Max = min(Max1, Max2),
59+
-spec intersect(atom(), false | range(), false | range()) -> false | range().
60+
intersect(_API, false, _) -> false;
61+
intersect(_API, _, false) -> false;
62+
intersect(API, {Min0, Max0} = Supported, {Min1, Max1} = Received) ->
63+
{Min2, Max2} = fix_range(API, Min1, Max1),
64+
Min = max(Min0, Min2),
65+
Max = min(Max0, Max2),
6566
case Min > Max of
66-
true -> erlang:error({no_intersection, R1, R2});
67+
true -> erlang:error({no_intersection, Supported, Received});
6768
false -> {Min, Max}
6869
end.
6970

71+
%% Special adjustment for received API range.
72+
%% - produce: Minimal version is in fact 3, but Kafka may respond 0.
73+
%% - fetch: Minimal version is in fact 4, but Kafka may respond 0.
74+
fix_range(produce, Min, Max) ->
75+
case Max >= 8 of
76+
true ->
77+
{max(Min, 3), Max};
78+
false ->
79+
{Min, Max}
80+
end;
81+
fix_range(fetch, Min, Max) ->
82+
case Max >= 11 of
83+
true ->
84+
{max(Min, 4), Max};
85+
false ->
86+
{Min, Max}
87+
end;
88+
fix_range(_API, Min, Max) ->
89+
{Min, Max}.
90+
91+
%% @doc Return the intersection of supported version ranges and received version ranges.
92+
-spec intersect(undefined | list()) -> #{atom() => range()}.
93+
intersect(undefined) ->
94+
%% kpro_connection is configured not to query api versions (kafka-0.9)
95+
%% always use minimum supported version in this case
96+
lists:foldl(
97+
fun(API, Acc) ->
98+
case kpro_api_vsn:kafka_09_range(API) of
99+
false -> Acc;
100+
{Min, _Max} -> Acc#{API => {Min, Min}}
101+
end
102+
end, #{}, kpro_schema:all_apis());
103+
intersect(Vsns) ->
104+
maps:fold(
105+
fun(API, {Min, Max}, Acc) ->
106+
case intersect(API, {Min, Max}) of
107+
false -> Acc;
108+
Intersection -> Acc#{API => Intersection}
109+
end
110+
end, #{}, Vsns).
111+
112+
%% @doc Intersect received api version range with supported range.
113+
-spec intersect(kpro:api(), range()) -> range().
114+
intersect(API, Received) ->
115+
Supported = kpro_api_vsn:range(API),
116+
try
117+
intersect(API, Supported, Received)
118+
catch
119+
error : {no_intersection, _, _} ->
120+
Reason = #{reason => incompatible_version_ranges,
121+
supported => Supported,
122+
received => Received,
123+
api => API},
124+
erlang:error(Reason)
125+
end.
126+
70127
%%%_* Emacs ====================================================================
71128
%%% Local Variables:
72129
%%% allout-layout: t

src/kpro_brokers.erl

Lines changed: 3 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ connect_controller(Bootstrap, Config, Opts) ->
116116
{ok, kpro:vsn_ranges()} | {error, any()}.
117117
get_api_versions(Connection) ->
118118
case kpro_connection:get_api_vsns(Connection) of
119-
{ok, Vsns} -> {ok, api_vsn_range_intersection(Vsns)};
119+
{ok, Vsns} -> {ok, kpro_api_vsn:intersect(Vsns)};
120120
{error, Reason} -> {error, Reason}
121121
end.
122122

@@ -269,39 +269,6 @@ discover_and_connect(DiscoverFun, Bootstrap, Config, Timeout) ->
269269
],
270270
kpro_lib:ok_pipe(FL, Timeout).
271271

272-
api_vsn_range_intersection(undefined) ->
273-
%% kpro_connection is configured not to query api versions (kafka-0.9)
274-
%% always use minimum supported version in this case
275-
lists:foldl(
276-
fun(API, Acc) ->
277-
case kpro_api_vsn:kafka_09_range(API) of
278-
false -> Acc;
279-
{Min, _Max} -> Acc#{API => {Min, Min}}
280-
end
281-
end, #{}, kpro_schema:all_apis());
282-
api_vsn_range_intersection(Vsns) ->
283-
maps:fold(
284-
fun(API, {Min, Max}, Acc) ->
285-
case api_vsn_range_intersection(API, {Min, Max}) of
286-
false -> Acc;
287-
Intersection -> Acc#{API => Intersection}
288-
end
289-
end, #{}, Vsns).
290-
291-
%% Intersect received api version range with supported range.
292-
api_vsn_range_intersection(API, Received) ->
293-
Expected = kpro_api_vsn:range(API),
294-
try
295-
kpro_api_vsn:intersect(Expected, Received)
296-
catch
297-
error : {no_intersection, _, _} ->
298-
Reason = #{reason => incompatible_version_ranges,
299-
expected => Expected,
300-
received => Received,
301-
api => API},
302-
erlang:error(Reason)
303-
end.
304-
305272
connect_any([], _Config, Errors) ->
306273
{error, lists:reverse(Errors)};
307274
connect_any([{Host, Port} | Rest], Config, Errors) ->
@@ -332,9 +299,9 @@ api_vsn_range_intersection_test() ->
332299
Received = {0, 0},
333300
?assertError(#{api := API,
334301
reason := incompatible_version_ranges,
335-
expected := _,
302+
supported := _,
336303
received := Received},
337-
api_vsn_range_intersection(API, Received)).
304+
kpro_api_vsn:intersect(API, Received)).
338305

339306
-endif.
340307
%%%_* Emacs ====================================================================

src/kpro_connection.erl

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ query_api_versions(Sock, Mod, ClientId, Deadline) ->
282282
case is_atom(API) of
283283
true ->
284284
%% known API for client
285-
Acc#{API => adjust_vsn(API, MinVsn, MaxVsn)};
285+
Acc#{API => {MinVsn, MaxVsn}};
286286
false ->
287287
%% a broker-only (ClusterAction) API
288288
Acc
@@ -293,26 +293,6 @@ query_api_versions(Sock, Mod, ClientId, Deadline) ->
293293
erlang:error({failed_to_query_api_versions, ErrorCode})
294294
end.
295295

296-
%% Special adjustment for API rage.
297-
%% - produce: Minimal version is in fact 3, but Kafka may respond 0.
298-
%% - fetch: Minimal version is in fact 4, but Kafka may respond 0.
299-
adjust_vsn(produce, Min, Max) ->
300-
case Max >= 8 of
301-
true ->
302-
{max(Min, 3), Max};
303-
false ->
304-
{Min, Max}
305-
end;
306-
adjust_vsn(fetch, Min, Max) ->
307-
case Max >= 11 of
308-
true ->
309-
{max(Min, 4), Max};
310-
false ->
311-
{Min, Max}
312-
end;
313-
adjust_vsn(_API, Min, Max) ->
314-
{Min, Max}.
315-
316296
get_tcp_mod(_SslOpts = true) -> ssl;
317297
get_tcp_mod(_SslOpts = [_|_]) -> ssl;
318298
get_tcp_mod(_) -> gen_tcp.

0 commit comments

Comments
 (0)