Skip to content

Commit b445d3f

Browse files
authored
Merge pull request #922 from aws-powertools/fix/tests-kafka-dependency
chore: update Kafka deserialization to use alias for ConsumerRecords
2 parents 02ce3f8 + 5dfd998 commit b445d3f

10 files changed

+143
-79
lines changed

libraries/tests/AWS.Lambda.Powertools.Kafka.Tests/Avro/HandlerTests.cs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@
55
using Avro.Specific;
66
using AWS.Lambda.Powertools.Kafka.Avro;
77

8+
#if DEBUG
9+
using KafkaAlias = AWS.Lambda.Powertools.Kafka;
10+
#else
11+
using KafkaAlias = AWS.Lambda.Powertools.Kafka.Avro;
12+
#endif
13+
814
namespace AWS.Lambda.Powertools.Kafka.Tests.Avro;
915

1016
public class KafkaHandlerTests
@@ -21,7 +27,7 @@ public async Task Handler_ProcessesKafkaEvent_Successfully()
2127
using var stream = new MemoryStream(Encoding.UTF8.GetBytes(kafkaJson));
2228

2329
// Act - Deserialize and process
24-
var kafkaEvent = serializer.Deserialize<ConsumerRecords<int, AvroProduct>>(stream);
30+
var kafkaEvent = serializer.Deserialize<KafkaAlias.ConsumerRecords<int, AvroProduct>>(stream);
2531
var response = await Handler(kafkaEvent, mockContext);
2632

2733
// Assert
@@ -69,7 +75,7 @@ public async Task Handler_ProcessesKafkaEvent_Primitive_Successfully()
6975
using var stream = new MemoryStream(Encoding.UTF8.GetBytes(kafkaJson));
7076

7177
// Act - Deserialize and process
72-
var kafkaEvent = serializer.Deserialize<ConsumerRecords<int, string>>(stream);
78+
var kafkaEvent = serializer.Deserialize<KafkaAlias.ConsumerRecords<int, string>>(stream);
7379
var response = await HandlerSimple(kafkaEvent, mockContext);
7480

7581
// Assert
@@ -240,7 +246,7 @@ private string ConvertToAvroBase64(AvroProduct product)
240246
}
241247

242248
// Define the test handler method
243-
private async Task<string> Handler(ConsumerRecords<int, AvroProduct> records, ILambdaContext context)
249+
private async Task<string> Handler(KafkaAlias.ConsumerRecords<int, AvroProduct> records, ILambdaContext context)
244250
{
245251
foreach (var record in records)
246252
{
@@ -251,7 +257,7 @@ private async Task<string> Handler(ConsumerRecords<int, AvroProduct> records, IL
251257
return "Successfully processed Kafka events";
252258
}
253259

254-
private async Task<string> HandlerSimple(ConsumerRecords<int, string> records, ILambdaContext context)
260+
private async Task<string> HandlerSimple(KafkaAlias.ConsumerRecords<int, string> records, ILambdaContext context)
255261
{
256262
foreach (var record in records)
257263
{
@@ -274,7 +280,7 @@ public async Task Handler_ProcessesKafkaEvent_WithAvroKey_Successfully()
274280
using var stream = new MemoryStream(Encoding.UTF8.GetBytes(kafkaJson));
275281

276282
// Act - Deserialize and process
277-
var kafkaEvent = serializer.Deserialize<ConsumerRecords<AvroKey, AvroProduct>>(stream);
283+
var kafkaEvent = serializer.Deserialize<KafkaAlias.ConsumerRecords<AvroKey, AvroProduct>>(stream);
278284
var response = await HandlerWithAvroKeys(kafkaEvent, mockContext);
279285

280286
// Assert
@@ -394,7 +400,7 @@ private string ConvertKeyToAvroBase64(AvroKey key)
394400
return Convert.ToBase64String(stream.ToArray());
395401
}
396402

397-
private async Task<string> HandlerWithAvroKeys(ConsumerRecords<AvroKey, AvroProduct> records,
403+
private async Task<string> HandlerWithAvroKeys(KafkaAlias.ConsumerRecords<AvroKey, AvroProduct> records,
398404
ILambdaContext context)
399405
{
400406
foreach (var record in records)

libraries/tests/AWS.Lambda.Powertools.Kafka.Tests/Avro/PowertoolsKafkaAvroSerializerTests.cs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22
using System.Text;
33
using AWS.Lambda.Powertools.Kafka.Avro;
44

5+
#if DEBUG
6+
using KafkaAlias = AWS.Lambda.Powertools.Kafka;
7+
#else
8+
using KafkaAlias = AWS.Lambda.Powertools.Kafka.Avro;
9+
#endif
10+
511
namespace AWS.Lambda.Powertools.Kafka.Tests.Avro;
612

713
public class PowertoolsKafkaAvroSerializerTests
@@ -15,7 +21,7 @@ public void Deserialize_KafkaEventWithAvroPayload_DeserializesToCorrectType()
1521
using var stream = new MemoryStream(Encoding.UTF8.GetBytes(kafkaEventJson));
1622

1723
// Act
18-
var result = serializer.Deserialize<ConsumerRecords<int, AvroProduct>>(stream);
24+
var result = serializer.Deserialize<KafkaAlias.ConsumerRecords<int, AvroProduct>>(stream);
1925

2026
// Assert
2127
Assert.NotNull(result);
@@ -54,7 +60,7 @@ public void KafkaEvent_ImplementsIEnumerable_ForDirectIteration()
5460
using var stream = new MemoryStream(Encoding.UTF8.GetBytes(kafkaEventJson));
5561

5662
// Act
57-
var result = serializer.Deserialize<ConsumerRecords<int, AvroProduct>>(stream);
63+
var result = serializer.Deserialize<KafkaAlias.ConsumerRecords<int, AvroProduct>>(stream);
5864

5965
// Assert - Test enumeration
6066
int count = 0;
@@ -91,7 +97,7 @@ public void Primitive_Deserialization()
9197
using var stream = new MemoryStream(Encoding.UTF8.GetBytes(kafkaEventJson));
9298

9399
// Act
94-
var result = serializer.Deserialize<ConsumerRecords<string, string>>(stream);
100+
var result = serializer.Deserialize<KafkaAlias.ConsumerRecords<string, string>>(stream);
95101
var firstRecord = result.First();
96102
Assert.Equal("Myvalue", firstRecord.Value);
97103
Assert.Equal("MyKey", firstRecord.Key);
@@ -113,7 +119,7 @@ public void DeserializeComplexKey_WhenAllDeserializationMethodsFail_ReturnsExcep
113119
using var stream = new MemoryStream(Encoding.UTF8.GetBytes(kafkaEventJson));
114120

115121
Assert.Throws<SerializationException>(() =>
116-
serializer.Deserialize<ConsumerRecords<TestModel, string>>(stream));
122+
serializer.Deserialize<KafkaAlias.ConsumerRecords<TestModel, string>>(stream));
117123
}
118124

119125
private string CreateKafkaEvent(string keyValue, string valueValue)

libraries/tests/AWS.Lambda.Powertools.Kafka.Tests/AvroErrorHandlingTests.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22
using System.Text;
33
using AWS.Lambda.Powertools.Kafka.Avro;
44

5+
#if DEBUG
6+
using KafkaAlias = AWS.Lambda.Powertools.Kafka;
7+
#else
8+
using KafkaAlias = AWS.Lambda.Powertools.Kafka.Avro;
9+
#endif
10+
511
namespace AWS.Lambda.Powertools.Kafka.Tests;
612

713
public class AvroErrorHandlingTests
@@ -22,7 +28,7 @@ public void AvroSerializer_WithCorruptedKeyData_ThrowSerializationException()
2228

2329
// Act & Assert
2430
var ex = Assert.Throws<SerializationException>(() =>
25-
serializer.Deserialize<ConsumerRecords<TestModel, string>>(stream));
31+
serializer.Deserialize<KafkaAlias.ConsumerRecords<TestModel, string>>(stream));
2632

2733
Assert.Contains("Failed to deserialize key data", ex.Message);
2834
}
@@ -43,7 +49,7 @@ public void AvroSerializer_WithCorruptedValueData_ThrowSerializationException()
4349

4450
// Act & Assert
4551
var ex = Assert.Throws<SerializationException>(() =>
46-
serializer.Deserialize<ConsumerRecords<string, TestModel>>(stream));
52+
serializer.Deserialize<KafkaAlias.ConsumerRecords<string, TestModel>>(stream));
4753

4854
Assert.Contains("Failed to deserialize value data", ex.Message);
4955
}

libraries/tests/AWS.Lambda.Powertools.Kafka.Tests/Json/PowertoolsKafkaJsonSerializerTests.cs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,12 @@
33
using System.Text.Json.Serialization;
44
using AWS.Lambda.Powertools.Kafka.Json;
55

6+
#if DEBUG
7+
using KafkaAlias = AWS.Lambda.Powertools.Kafka;
8+
#else
9+
using KafkaAlias = AWS.Lambda.Powertools.Kafka.Json;
10+
#endif
11+
612
namespace AWS.Lambda.Powertools.Kafka.Tests.Json;
713

814
public class PowertoolsKafkaJsonSerializerTests
@@ -20,7 +26,7 @@ public void Deserialize_KafkaEventWithJsonPayload_DeserializesToCorrectType()
2026
using var stream = new MemoryStream(Encoding.UTF8.GetBytes(kafkaEventJson));
2127

2228
// Act
23-
var result = serializer.Deserialize<ConsumerRecords<int, TestModel>>(stream);
29+
var result = serializer.Deserialize<KafkaAlias.ConsumerRecords<int, TestModel>>(stream);
2430

2531
// Assert
2632
Assert.NotNull(result);
@@ -39,7 +45,7 @@ public void KafkaEvent_ImplementsIEnumerable_ForDirectIteration()
3945
using var stream = new MemoryStream(Encoding.UTF8.GetBytes(kafkaEventJson));
4046

4147
// Act
42-
var result = serializer.Deserialize<ConsumerRecords<string, JsonProduct>>(stream);
48+
var result = serializer.Deserialize<KafkaAlias.ConsumerRecords<string, JsonProduct>>(stream);
4349

4450
// Assert - Test enumeration
4551
int count = 0;
@@ -74,7 +80,7 @@ public void Primitive_Deserialization()
7480
using var stream = new MemoryStream(Encoding.UTF8.GetBytes(kafkaEventJson));
7581

7682
// Act
77-
var result = serializer.Deserialize<ConsumerRecords<string, string>>(stream);
83+
var result = serializer.Deserialize<KafkaAlias.ConsumerRecords<string, string>>(stream);
7884
var firstRecord = result.First();
7985
Assert.Equal("Myvalue", firstRecord.Value);
8086
Assert.Equal("MyKey", firstRecord.Key);
@@ -96,7 +102,7 @@ public void DeserializeComplexKey_StandardJsonDeserialization_Works()
96102
using var stream = new MemoryStream(Encoding.UTF8.GetBytes(kafkaEventJson));
97103

98104
// Act
99-
var result = serializer.Deserialize<ConsumerRecords<Dictionary<string, object>, string>>(stream);
105+
var result = serializer.Deserialize<KafkaAlias.ConsumerRecords<Dictionary<string, object>, string>>(stream);
100106

101107
// Assert
102108
var record = result.First();
@@ -126,7 +132,7 @@ public void DeserializeComplexKey_WithSerializerContext_UsesContext()
126132
using var stream = new MemoryStream(Encoding.UTF8.GetBytes(kafkaEventJson));
127133

128134
// Act
129-
var result = serializer.Deserialize<ConsumerRecords<TestModel, string>>(stream);
135+
var result = serializer.Deserialize<KafkaAlias.ConsumerRecords<TestModel, string>>(stream);
130136

131137
// Assert
132138
var record = result.First();
@@ -155,7 +161,7 @@ public void DeserializeComplexValue_WithSerializerContext_UsesContext()
155161
using var stream = new MemoryStream(Encoding.UTF8.GetBytes(kafkaEventJson));
156162

157163
// Act
158-
var result = serializer.Deserialize<ConsumerRecords<string, TestModel>>(stream);
164+
var result = serializer.Deserialize<KafkaAlias.ConsumerRecords<string, TestModel>>(stream);
159165

160166
// Assert
161167
var record = result.First();
@@ -187,7 +193,7 @@ public void DeserializeComplexValue_WithCustomJsonOptions_RespectsOptions()
187193
using var stream = new MemoryStream(Encoding.UTF8.GetBytes(kafkaEventJson));
188194

189195
// Act
190-
var result = serializer.Deserialize<ConsumerRecords<string, JsonProduct>>(stream);
196+
var result = serializer.Deserialize<KafkaAlias.ConsumerRecords<string, JsonProduct>>(stream);
191197

192198
// Assert
193199
var record = result.First();
@@ -212,7 +218,7 @@ public void DeserializeComplexValue_WithEmptyData_ReturnsNullOrDefault()
212218
using var stream = new MemoryStream(Encoding.UTF8.GetBytes(kafkaEventJson));
213219

214220
// Act
215-
var result = serializer.Deserialize<ConsumerRecords<string, JsonProduct>>(stream);
221+
var result = serializer.Deserialize<KafkaAlias.ConsumerRecords<string, JsonProduct>>(stream);
216222

217223
// Assert
218224
var record = result.First();
@@ -243,7 +249,7 @@ public void DeserializeComplexValue_WithContextAndNullResult_ReturnsNull()
243249
using var stream = new MemoryStream(Encoding.UTF8.GetBytes(kafkaEventJson));
244250

245251
// Act
246-
var result = serializer.Deserialize<ConsumerRecords<string, TestModel>>(stream);
252+
var result = serializer.Deserialize<KafkaAlias.ConsumerRecords<string, TestModel>>(stream);
247253

248254
// Assert
249255
var record = result.First();

libraries/tests/AWS.Lambda.Powertools.Kafka.Tests/JsonErrorHandlingTests.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22
using System.Text;
33
using AWS.Lambda.Powertools.Kafka.Json;
44

5+
#if DEBUG
6+
using KafkaAlias = AWS.Lambda.Powertools.Kafka;
7+
#else
8+
using KafkaAlias = AWS.Lambda.Powertools.Kafka.Json;
9+
#endif
10+
511
namespace AWS.Lambda.Powertools.Kafka.Tests;
612

713
public class JsonErrorHandlingTests
@@ -22,7 +28,7 @@ public void JsonSerializer_WithCorruptedKeyData_ThrowSerializationException()
2228

2329
// Act & Assert
2430
var ex = Assert.Throws<SerializationException>(() =>
25-
serializer.Deserialize<ConsumerRecords<Json.TestModel, string>>(stream));
31+
serializer.Deserialize<KafkaAlias.ConsumerRecords<Json.TestModel, string>>(stream));
2632

2733
Assert.Contains("Failed to deserialize key data", ex.Message);
2834
}
@@ -43,7 +49,7 @@ public void JsonSerializer_WithCorruptedValueData_ThrowSerializationException()
4349

4450
// Act & Assert
4551
var ex = Assert.Throws<SerializationException>(() =>
46-
serializer.Deserialize<ConsumerRecords<string, Json.TestModel>>(stream));
52+
serializer.Deserialize<KafkaAlias.ConsumerRecords<string, Json.TestModel>>(stream));
4753

4854
Assert.Contains("Failed to deserialize value data", ex.Message);
4955
}

libraries/tests/AWS.Lambda.Powertools.Kafka.Tests/JsonTests.cs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,12 @@
33
using Amazon.Lambda.TestUtilities;
44
using AWS.Lambda.Powertools.Kafka.Json;
55

6+
#if DEBUG
7+
using KafkaAlias = AWS.Lambda.Powertools.Kafka;
8+
#else
9+
using KafkaAlias = AWS.Lambda.Powertools.Kafka.Json;
10+
#endif
11+
612
namespace AWS.Lambda.Powertools.Kafka.Tests;
713

814
public class JsonTests
@@ -31,7 +37,7 @@ public void Given_JsonStreamInput_When_DeserializedWithJsonSerializer_Then_Corre
3137
using var stream = new MemoryStream(Encoding.UTF8.GetBytes(json));
3238

3339
// When
34-
var result = serializer.Deserialize<ConsumerRecords<string, JsonProduct>>(stream);
40+
var result = serializer.Deserialize<KafkaAlias.ConsumerRecords<string, JsonProduct>>(stream);
3541

3642
// Then
3743
Assert.Equal("aws:kafka", result.EventSource);
@@ -47,7 +53,7 @@ public void Given_JsonStreamInput_When_DeserializedWithJsonSerializer_Then_Corre
4753
public void Given_RawUtf8Data_When_ProcessedWithDefaultHandler_Then_DeserializesToStrings()
4854
{
4955
// Given
50-
string Handler(ConsumerRecords<string, string> records, ILambdaContext context)
56+
string Handler(KafkaAlias.ConsumerRecords<string, string> records, ILambdaContext context)
5157
{
5258
foreach (var record in records)
5359
{
@@ -82,7 +88,7 @@ string Handler(ConsumerRecords<string, string> records, ILambdaContext context)
8288

8389
// Use the default serializer which handles base64 → UTF-8 conversion
8490
var serializer = new PowertoolsKafkaJsonSerializer();
85-
var records = serializer.Deserialize<ConsumerRecords<string, string>>(stream);
91+
var records = serializer.Deserialize<KafkaAlias.ConsumerRecords<string, string>>(stream);
8692

8793
// When
8894
var result = Handler(records, mockContext);

0 commit comments

Comments
 (0)